Merge branch 'stats_v3' of github.com:yenicelik/web3-proxy into Web3ProxyError
This commit is contained in:
commit
ffdf25787f
430
Cargo.lock
generated
430
Cargo.lock
generated
@ -292,9 +292,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.6.8"
|
||||
version = "0.6.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2bd379e511536bad07447f899300aa526e9bae8e6f66dc5e5ca45d7587b7c1ec"
|
||||
checksum = "8582122b8edba2af43eaf6b80dbfd33f421b5a0eb3a3113d21bc096ac5b44faf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
@ -322,7 +322,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.18.0",
|
||||
"tower",
|
||||
"tower-http 0.3.5",
|
||||
"tower-http",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
@ -340,9 +340,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34"
|
||||
checksum = "b2f958c80c248b34b9a877a643811be8dbca03ca5ba827f2b63baf3a81e5fc4e"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@ -357,9 +357,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "axum-macros"
|
||||
version = "0.3.4"
|
||||
version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841"
|
||||
checksum = "404e816a138c27c29f7428ae9b1816ab880ba6923fa76a9f15296af79444a8dc"
|
||||
dependencies = [
|
||||
"heck 0.4.1",
|
||||
"proc-macro2",
|
||||
@ -735,6 +735,12 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cassowary"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53"
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.79"
|
||||
@ -802,6 +808,15 @@ dependencies = [
|
||||
"textwrap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_complete"
|
||||
version = "3.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f7a2e0a962c45ce25afce14220bc24f9dade0a1787f185cecf96bfba7847cd8"
|
||||
dependencies = [
|
||||
"clap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "3.2.18"
|
||||
@ -891,6 +906,34 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "color-eyre"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f1885697ee8a177096d42f158922251a41973117f6d8a234cee94b9509157b7"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"color-spantrace",
|
||||
"eyre",
|
||||
"indenter",
|
||||
"once_cell",
|
||||
"owo-colors",
|
||||
"tracing-error",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "color-spantrace"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6eee477a4a8a72f4addd4de416eb56d54bc307b284d6601bafdee1f4ea462d1"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"owo-colors",
|
||||
"tracing-core",
|
||||
"tracing-error",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.6"
|
||||
@ -941,6 +984,42 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-api"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"prost-types",
|
||||
"tonic",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-subscriber"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be"
|
||||
dependencies = [
|
||||
"console-api",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"futures",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"prost-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber 0.3.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.7.1"
|
||||
@ -1076,6 +1155,32 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossterm"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0ebde6a9dd5e331cd6c6f48253254d117642c31653baa475e394657c59c1f7d"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"crossterm_winapi",
|
||||
"futures-core",
|
||||
"libc",
|
||||
"mio 0.7.14",
|
||||
"parking_lot 0.11.2",
|
||||
"signal-hook",
|
||||
"signal-hook-mio",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossterm_winapi"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a6966607622438301997d3dac0d2f6e9a90c68bb6bc1785ea98456ab93c0507"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crunchy"
|
||||
version = "0.2.2"
|
||||
@ -1335,6 +1440,15 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs"
|
||||
version = "4.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
|
||||
dependencies = [
|
||||
"dirs-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs-next"
|
||||
version = "2.0.0"
|
||||
@ -1345,6 +1459,17 @@ dependencies = [
|
||||
"dirs-sys-next",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs-sys"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"redox_users",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs-sys-next"
|
||||
version = "0.1.2"
|
||||
@ -2215,9 +2340,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.15"
|
||||
version = "0.3.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4"
|
||||
checksum = "5be7b54589b581f624f566bf5d8eb2bab1db736c51528720b6bd36b96b55924d"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@ -2504,6 +2629,18 @@ dependencies = [
|
||||
"tokio-rustls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
|
||||
dependencies = [
|
||||
"hyper",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.5.0"
|
||||
@ -2759,9 +2896,9 @@ checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.25"
|
||||
version = "0.1.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b"
|
||||
checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
@ -2979,6 +3116,19 @@ dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.7.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"miow",
|
||||
"ntapi",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.6"
|
||||
@ -2991,6 +3141,15 @@ dependencies = [
|
||||
"windows-sys 0.45.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miow"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "moka"
|
||||
version = "0.10.0"
|
||||
@ -3074,6 +3233,25 @@ version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7"
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.0"
|
||||
@ -3371,6 +3549,18 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "owo-colors"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2386b4ebe91c2f7f51082d4cefa145d030e33a1842a96b12e4885cc3c01f7a55"
|
||||
|
||||
[[package]]
|
||||
name = "pagerduty-rs"
|
||||
version = "0.1.6"
|
||||
@ -3823,6 +4013,38 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.11.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.11.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.11.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88"
|
||||
dependencies = [
|
||||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ptr_meta"
|
||||
version = "0.1.4"
|
||||
@ -4762,9 +4984,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.152"
|
||||
version = "1.0.154"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
|
||||
checksum = "8cdd151213925e7f1ab45a9bbfb129316bd00799784b174b7cc7bcd16961c49e"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
@ -4781,9 +5003,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.152"
|
||||
version = "1.0.154"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
|
||||
checksum = "4fc80d722935453bcafdc2c9a73cd6fac4dc1938f0346035d84bf99fa9e33217"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -4792,9 +5014,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.93"
|
||||
version = "1.0.94"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
|
||||
checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"ryu",
|
||||
@ -4941,6 +5163,27 @@ dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook"
|
||||
version = "0.3.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"signal-hook-registry",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-mio"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"mio 0.7.14",
|
||||
"signal-hook",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.1"
|
||||
@ -5035,9 +5278,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.4.7"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
|
||||
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
@ -5299,9 +5542,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.107"
|
||||
version = "1.0.109"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
|
||||
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -5489,15 +5732,15 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.25.0"
|
||||
version = "1.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af"
|
||||
checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bytes",
|
||||
"libc",
|
||||
"memchr",
|
||||
"mio",
|
||||
"mio 0.8.6",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
@ -5505,7 +5748,46 @@ dependencies = [
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.42.0",
|
||||
"windows-sys 0.45.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-console"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fce5f0a53cd350a00b3a37dcb85758eb3c333beeb334b40584f7747b1e01374e"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"clap",
|
||||
"clap_complete",
|
||||
"color-eyre",
|
||||
"console-api",
|
||||
"crossterm",
|
||||
"dirs",
|
||||
"futures",
|
||||
"h2",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"once_cell",
|
||||
"prost-types",
|
||||
"regex",
|
||||
"serde",
|
||||
"tokio",
|
||||
"toml 0.5.11",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-subscriber 0.3.16",
|
||||
"tui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -5668,6 +5950,38 @@ dependencies = [
|
||||
"toml_datetime 0.6.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost-derive",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@ -5676,33 +5990,18 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-range-header",
|
||||
"pin-project-lite",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.4.0"
|
||||
@ -5717,6 +6016,7 @@ dependencies = [
|
||||
"http-body",
|
||||
"http-range-header",
|
||||
"pin-project-lite",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
@ -5767,6 +6067,16 @@ dependencies = [
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-error"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4d7c0b83d4a500748fa5879461652b361edf5c9d51ede2a2ac03875ca185e24"
|
||||
dependencies = [
|
||||
"tracing",
|
||||
"tracing-subscriber 0.2.25",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
@ -5777,6 +6087,17 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"log",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-serde"
|
||||
version = "0.1.3"
|
||||
@ -5816,12 +6137,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
|
||||
dependencies = [
|
||||
"matchers 0.1.0",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -5836,6 +6160,19 @@ version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
|
||||
|
||||
[[package]]
|
||||
name = "tui"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39c8ce4e27049eed97cfa363a5048b09d995e209994634a0efc26a14ab6c0c23"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cassowary",
|
||||
"crossterm",
|
||||
"unicode-segmentation",
|
||||
"unicode-width",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tungstenite"
|
||||
version = "0.17.3"
|
||||
@ -6203,6 +6540,7 @@ dependencies = [
|
||||
"axum-client-ip",
|
||||
"axum-macros",
|
||||
"chrono",
|
||||
"console-subscriber",
|
||||
"counter",
|
||||
"deferred-rate-limiter",
|
||||
"derive_more",
|
||||
@ -6220,6 +6558,7 @@ dependencies = [
|
||||
"handlebars",
|
||||
"hashbrown 0.13.2",
|
||||
"hdrhistogram",
|
||||
"hostname",
|
||||
"http",
|
||||
"influxdb2",
|
||||
"influxdb2-structmap",
|
||||
@ -6250,11 +6589,12 @@ dependencies = [
|
||||
"thread-fast-rng",
|
||||
"time 0.3.20",
|
||||
"tokio",
|
||||
"tokio-console",
|
||||
"tokio-stream",
|
||||
"tokio-uring",
|
||||
"toml 0.7.2",
|
||||
"tower",
|
||||
"tower-http 0.4.0",
|
||||
"tower-http",
|
||||
"ulid",
|
||||
"url",
|
||||
"uuid 1.3.0",
|
||||
|
15
Dockerfile
15
Dockerfile
@ -1,15 +1,13 @@
|
||||
#
|
||||
# cargo-nextest
|
||||
# We only pay the installation cost once,
|
||||
# it will be cached from the second build onwards
|
||||
#
|
||||
FROM rust:1.67.1-bullseye AS builder
|
||||
FROM rust:1.68.2-bullseye AS builder
|
||||
|
||||
WORKDIR /app
|
||||
ENV CARGO_TERM_COLOR always
|
||||
|
||||
# a next-generation test runner for Rust projects.
|
||||
# We only pay the installation cost once,
|
||||
# it will be cached from the second build onwards
|
||||
# TODO: more mount type cache?
|
||||
# TODO: do this in a seperate FROM and COPY it in
|
||||
RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
cargo install cargo-nextest
|
||||
|
||||
@ -18,7 +16,10 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
ENV PATH /root/.foundry/bin:$PATH
|
||||
RUN curl -L https://foundry.paradigm.xyz | bash && foundryup
|
||||
|
||||
RUN apt-get update && apt-get install --yes librdkafka-dev && rm -rf /var/lib/apt/lists/*
|
||||
# install web3-proxy system dependencies. most things are rust-only, but not everything
|
||||
RUN apt-get update && \
|
||||
apt-get install --yes librdkafka-dev && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# copy the application
|
||||
COPY . .
|
||||
|
15
TODO.md
15
TODO.md
@ -415,13 +415,13 @@ These are not yet ordered. There might be duplicates. We might not actually need
|
||||
- change premium concurrency limit to be against ip+rpckey
|
||||
- then sites like curve.fi don't have to worry about their user count
|
||||
- it does mean we will have a harder time capacity planning from the number of keys
|
||||
- [ ] eth_getLogs is going to unsynced nodes when synced nodes are available. always prefer synced nodes
|
||||
- [ ] have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code)
|
||||
- [ ] don't use new_head_provider anywhere except new head subscription
|
||||
- [ ] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly
|
||||
- [x] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly
|
||||
- change the send_best function to only include servers that are at least close to fully synced
|
||||
- [ ] have private transactions be enabled by a url setting rather than a setting on the key
|
||||
- [ ] enable mev protected transactions with either a /protect/ url (instead of /private/) or the database (when on /rpc/)
|
||||
- [-] have private transactions be enabled by a url setting rather than a setting on the key
|
||||
- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running
|
||||
- [ ] cli for adding rpc keys to an existing user
|
||||
- [ ] rename "private" to "mev protected" to avoid confusion about private transactions being public once they are mined
|
||||
- [ ] allow restricting an rpc key to specific chains
|
||||
@ -463,6 +463,10 @@ These are not yet ordered. There might be duplicates. We might not actually need
|
||||
- [ ] implement remaining subscriptions
|
||||
- would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead
|
||||
- [ ] tests should use `test-env-log = "0.2.8"`
|
||||
- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running
|
||||
- [ ] weighted random choice should still prioritize non-archive servers
|
||||
- maybe shuffle randomly and then sort by (block_limit, random_index)?
|
||||
- maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough?
|
||||
- [ ] some places we call it "accounting" others a "stat". be consistent
|
||||
- [ ] cli commands to search users by key
|
||||
- [ ] flamegraphs show 25% of the time to be in moka-housekeeper. tune that
|
||||
@ -603,6 +607,8 @@ These are not ordered. I think some rows also accidently got deleted here. Check
|
||||
- look at average request time for getBlock? i'm not sure how good a proxy that will be for serving eth_call, but its a start
|
||||
- https://crates.io/crates/histogram-sampler
|
||||
- [ ] interval for http subscriptions should be based on block time. load from config is easy, but better to query. currently hard coded to 13 seconds
|
||||
- [ ] check code to keep us from going backwards. maybe that is causing outages
|
||||
- [ ] min_backup_rpcs seperate from min_synced_rpcs
|
||||
|
||||
in another repo: event subscriber
|
||||
- [ ] watch for transfer events to our contract and submit them to /payment/$tx_hash
|
||||
@ -729,13 +735,10 @@ in another repo: event subscriber
|
||||
- [ ] have an upgrade tier that queries multiple backends at once. returns on first Ok result, collects errors. if no Ok, find the most common error and then respond with that
|
||||
- [ ] give public_recent_ips_salt a better, more general, name
|
||||
- [ ] include tier in the head block logs?
|
||||
<<<<<<< HEAD
|
||||
- [ ] i think i use FuturesUnordered when a try_join_all might be better
|
||||
- [ ] since we are read-heavy on our configs, maybe we should use a cache
|
||||
- "using a thread local storage and explicit types" https://docs.rs/arc-swap/latest/arc_swap/cache/struct.Cache.html
|
||||
- [ ] tests for config reloading
|
||||
- [ ] use pin instead of arc for a bunch of things?
|
||||
- https://fasterthanli.me/articles/pin-and-suffering
|
||||
=======
|
||||
- [ ] calculate archive depth automatically based on block_data_limits
|
||||
>>>>>>> 77df3fa (stats v2)
|
||||
|
@ -11,4 +11,4 @@ anyhow = "1.0.69"
|
||||
hashbrown = "0.13.2"
|
||||
log = "0.4.17"
|
||||
moka = { version = "0.10.0", default-features = false, features = ["future"] }
|
||||
tokio = "1.25.0"
|
||||
tokio = "1.26.0"
|
||||
|
@ -11,7 +11,7 @@ path = "src/mod.rs"
|
||||
|
||||
[dependencies]
|
||||
sea-orm = "0.11.0"
|
||||
serde = "1.0.152"
|
||||
serde = "1.0.154"
|
||||
uuid = "1.3.0"
|
||||
ethers = "1.0.2"
|
||||
ulid = "1.0.0"
|
||||
|
@ -40,6 +40,7 @@ pub struct Model {
|
||||
pub max_response_bytes: u64,
|
||||
pub archive_request: bool,
|
||||
pub origin: Option<String>,
|
||||
pub migrated: Option<DateTime>
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
|
@ -9,7 +9,7 @@ name = "migration"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.25.0", features = ["full", "tracing"] }
|
||||
tokio = { version = "1.26.0", features = ["full", "tracing"] }
|
||||
|
||||
[dependencies.sea-orm-migration]
|
||||
version = "0.11.0"
|
||||
|
@ -6,4 +6,4 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
flume = "0.10.14"
|
||||
tokio = { version = "1.25.0", features = ["time"] }
|
||||
tokio = { version = "1.26.0", features = ["time"] }
|
||||
|
@ -8,4 +8,4 @@ edition = "2021"
|
||||
anyhow = "1.0.69"
|
||||
chrono = "0.4.23"
|
||||
deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] }
|
||||
tokio = "1.25.0"
|
||||
tokio = "1.26.0"
|
||||
|
@ -1,2 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "1.67.1"
|
||||
channel = "1.68.2"
|
||||
|
1
scripts/manual-tests/21-sql-migration-make-backup.sh
Normal file
1
scripts/manual-tests/21-sql-migration-make-backup.sh
Normal file
@ -0,0 +1 @@
|
||||
mysqldump -u root --password=dev_web3_proxy -h 127.0.0.1 --port 13306
|
@ -0,0 +1,17 @@
|
||||
SELECT COUNT(*) FROM rpc_accounting WHERE migrated IS NULL;
|
||||
UPDATE rpc_accounting SET migrated = NULL;
|
||||
|
||||
SELECT SUM(frontend_requests) FROM rpc_accounting;
|
||||
SELECT SUM(frontend_requests) FROM rpc_accounting_v2;
|
||||
|
||||
SELECT SUM(backend_requests) FROM rpc_accounting;
|
||||
SELECT SUM(backend_requests) FROM rpc_accounting_v2;
|
||||
|
||||
SELECT SUM(sum_request_bytes) FROM rpc_accounting;
|
||||
SELECT SUM(sum_request_bytes) FROM rpc_accounting_v2;
|
||||
|
||||
SELECT SUM(sum_response_millis) FROM rpc_accounting;
|
||||
SELECT SUM(sum_response_millis) FROM rpc_accounting_v2;
|
||||
|
||||
SELECT SUM(sum_response_bytes) FROM rpc_accounting;
|
||||
SELECT SUM(sum_response_bytes) FROM rpc_accounting_v2;
|
@ -7,8 +7,9 @@ default-run = "web3_proxy_cli"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[features]
|
||||
default = ["deadlock_detection"]
|
||||
default = ["deadlock_detection", "tokio-console"]
|
||||
deadlock_detection = ["parking_lot/deadlock_detection"]
|
||||
tokio-console = ["dep:tokio-console", "dep:console-subscriber"]
|
||||
|
||||
# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" }
|
||||
|
||||
@ -27,10 +28,11 @@ thread-fast-rng = { path = "../thread-fast-rng" }
|
||||
|
||||
anyhow = { version = "1.0.69", features = ["backtrace"] }
|
||||
argh = "0.1.10"
|
||||
axum = { version = "0.6.8", features = ["headers", "ws"] }
|
||||
axum = { version = "0.6.10", features = ["headers", "ws"] }
|
||||
axum-client-ip = "0.4.0"
|
||||
axum-macros = "0.3.4"
|
||||
axum-macros = "0.3.5"
|
||||
chrono = "0.4.23"
|
||||
console-subscriber = { version = "*", optional = true }
|
||||
counter = "0.5.7"
|
||||
derive_more = "0.99.17"
|
||||
dotenv = "0.15.0"
|
||||
@ -49,6 +51,7 @@ hdrhistogram = "7.5.2"
|
||||
http = "0.2.9"
|
||||
influxdb2 = { version = "0.3", features = ["rustls"] }
|
||||
influxdb2-structmap = "0.2.0"
|
||||
hostname = "0.3.1"
|
||||
ipnet = "2.7.1"
|
||||
itertools = "0.10.5"
|
||||
log = "0.4.17"
|
||||
@ -67,12 +70,13 @@ reqwest = { version = "0.11.14", default-features = false, features = ["json", "
|
||||
rmp-serde = "1.1.1"
|
||||
rustc-hash = "1.1.0"
|
||||
sentry = { version = "0.30.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
|
||||
serde = { version = "1.0.152", features = [] }
|
||||
serde_json = { version = "1.0.93", default-features = false, features = ["alloc", "raw_value"] }
|
||||
serde = { version = "1.0.154", features = [] }
|
||||
serde_json = { version = "1.0.94", default-features = false, features = ["alloc", "raw_value"] }
|
||||
serde_prometheus = "0.2.1"
|
||||
siwe = "0.5.0"
|
||||
time = "0.3.20"
|
||||
tokio = { version = "1.25.0", features = ["full"] }
|
||||
tokio = { version = "1.26.0", features = ["full"] }
|
||||
tokio-console = { version = "*", optional = true }
|
||||
tokio-stream = { version = "0.1.12", features = ["sync"] }
|
||||
tokio-uring = { version = "0.4.0", optional = true }
|
||||
toml = "0.7.2"
|
||||
|
@ -68,7 +68,7 @@ pub static APP_USER_AGENT: &str = concat!(
|
||||
);
|
||||
|
||||
// aggregate across 1 week
|
||||
const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
|
||||
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
|
||||
|
||||
#[derive(Debug, From)]
|
||||
struct ResponseCacheKey {
|
||||
@ -229,6 +229,7 @@ pub struct Web3ProxyApp {
|
||||
pub db_conn: Option<sea_orm::DatabaseConnection>,
|
||||
/// Optional read-only database for users and accounting
|
||||
pub db_replica: Option<DatabaseReplica>,
|
||||
pub hostname: Option<String>,
|
||||
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
|
||||
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
|
||||
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
||||
@ -576,7 +577,12 @@ impl Web3ProxyApp {
|
||||
// stats can be saved in mysql, influxdb, both, or none
|
||||
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(
|
||||
top_config.app.chain_id,
|
||||
top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(),
|
||||
top_config
|
||||
.app
|
||||
.influxdb_bucket
|
||||
.clone()
|
||||
.context("No influxdb bucket was provided")?
|
||||
.to_owned(),
|
||||
db_conn.clone(),
|
||||
influxdb_client.clone(),
|
||||
60,
|
||||
@ -761,6 +767,10 @@ impl Web3ProxyApp {
|
||||
Some(private_rpcs)
|
||||
};
|
||||
|
||||
let hostname = hostname::get()
|
||||
.ok()
|
||||
.and_then(|x| x.to_str().map(|x| x.to_string()));
|
||||
|
||||
let app = Self {
|
||||
config: top_config.app.clone(),
|
||||
balanced_rpcs,
|
||||
@ -777,6 +787,7 @@ impl Web3ProxyApp {
|
||||
db_conn,
|
||||
db_replica,
|
||||
influxdb_client,
|
||||
hostname,
|
||||
vredis_pool,
|
||||
rpc_secret_key_cache,
|
||||
bearer_token_semaphores,
|
||||
@ -1571,7 +1582,9 @@ impl Web3ProxyApp {
|
||||
match &request.params {
|
||||
Some(serde_json::Value::Array(params)) => {
|
||||
// TODO: make a struct and use serde conversion to clean this up
|
||||
if params.len() != 1 || !params[0].is_string() {
|
||||
if params.len() != 1
|
||||
|| !params.get(0).map(|x| x.is_string()).unwrap_or(false)
|
||||
{
|
||||
// TODO: what error code?
|
||||
return Ok((
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
@ -1785,7 +1798,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = self.stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
method.to_string(),
|
||||
Some(method.to_string()),
|
||||
authorization.clone(),
|
||||
request_metadata,
|
||||
response.num_bytes(),
|
||||
@ -1808,7 +1821,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = self.stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
request_method,
|
||||
Some(request_method),
|
||||
authorization.clone(),
|
||||
request_metadata,
|
||||
response.num_bytes(),
|
||||
|
@ -96,7 +96,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
"eth_subscription(newHeads)".to_string(),
|
||||
Some("eth_subscription(newHeads)".to_string()),
|
||||
authorization.clone(),
|
||||
request_metadata.clone(),
|
||||
response_bytes,
|
||||
@ -167,7 +167,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
"eth_subscription(newPendingTransactions)".to_string(),
|
||||
Some("eth_subscription(newPendingTransactions)".to_string()),
|
||||
authorization.clone(),
|
||||
request_metadata.clone(),
|
||||
response_bytes,
|
||||
@ -243,7 +243,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
"eth_subscription(newPendingFullTransactions)".to_string(),
|
||||
Some("eth_subscription(newPendingFullTransactions)".to_string()),
|
||||
authorization.clone(),
|
||||
request_metadata.clone(),
|
||||
response_bytes,
|
||||
@ -319,7 +319,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
"eth_subscription(newPendingRawTransactions)".to_string(),
|
||||
Some("eth_subscription(newPendingRawTransactions)".to_string()),
|
||||
authorization.clone(),
|
||||
request_metadata.clone(),
|
||||
response_bytes,
|
||||
@ -350,7 +350,7 @@ impl Web3ProxyApp {
|
||||
|
||||
if let Some(stat_sender) = self.stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
request_json.method.clone(),
|
||||
Some(request_json.method.clone()),
|
||||
authorization.clone(),
|
||||
request_metadata,
|
||||
response.num_bytes(),
|
||||
|
@ -2,7 +2,7 @@ use anyhow::Context;
|
||||
use argh::FromArgs;
|
||||
use entities::{admin, login, user};
|
||||
use ethers::types::Address;
|
||||
use log::debug;
|
||||
use log::{debug, info};
|
||||
use migration::sea_orm::{
|
||||
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, ModelTrait, QueryFilter,
|
||||
};
|
||||
@ -10,7 +10,7 @@ use migration::sea_orm::{
|
||||
/// change a user's admin status. eiter they are an admin, or they aren't
|
||||
#[derive(FromArgs, PartialEq, Eq, Debug)]
|
||||
#[argh(subcommand, name = "change_admin_status")]
|
||||
pub struct ChangeUserAdminStatusSubCommand {
|
||||
pub struct ChangeAdminStatusSubCommand {
|
||||
/// the address of the user whose admin status you want to modify
|
||||
#[argh(positional)]
|
||||
address: String,
|
||||
@ -20,31 +20,33 @@ pub struct ChangeUserAdminStatusSubCommand {
|
||||
should_be_admin: bool,
|
||||
}
|
||||
|
||||
impl ChangeUserAdminStatusSubCommand {
|
||||
impl ChangeAdminStatusSubCommand {
|
||||
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
|
||||
let address: Address = self.address.parse()?;
|
||||
let should_be_admin: bool = self.should_be_admin;
|
||||
|
||||
let address: Vec<u8> = address.to_fixed_bytes().into();
|
||||
// we keep "address" around for use in logs
|
||||
let address_vec: Vec<u8> = address.to_fixed_bytes().into();
|
||||
|
||||
// Find user in database
|
||||
let user = user::Entity::find()
|
||||
.filter(user::Column::Address.eq(address.clone()))
|
||||
.filter(user::Column::Address.eq(address_vec))
|
||||
.one(db_conn)
|
||||
.await?
|
||||
.context(format!("No user with this id found {:?}", address))?;
|
||||
|
||||
debug!("user: {:#?}", user);
|
||||
debug!("user: {}", serde_json::to_string_pretty(&user)?);
|
||||
|
||||
// Check if there is a record in the database
|
||||
match admin::Entity::find()
|
||||
.filter(admin::Column::UserId.eq(address))
|
||||
.filter(admin::Column::UserId.eq(user.id))
|
||||
.one(db_conn)
|
||||
.await?
|
||||
{
|
||||
Some(old_admin) if !should_be_admin => {
|
||||
// User is already an admin, but shouldn't be
|
||||
old_admin.delete(db_conn).await?;
|
||||
info!("revoked admin status");
|
||||
}
|
||||
None if should_be_admin => {
|
||||
// User is not an admin yet, but should be
|
||||
@ -53,11 +55,11 @@ impl ChangeUserAdminStatusSubCommand {
|
||||
..Default::default()
|
||||
};
|
||||
new_admin.insert(db_conn).await?;
|
||||
info!("granted admin status");
|
||||
}
|
||||
_ => {
|
||||
// Do nothing in this case
|
||||
debug!("no change needed for: {:#?}", user);
|
||||
// Early return
|
||||
info!("no change needed for: {:#?}", user);
|
||||
// Since no change happened, we do not want to delete active logins. Return now.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
mod change_admin_status;
|
||||
mod change_user_address;
|
||||
mod change_user_admin_status;
|
||||
mod change_user_tier;
|
||||
mod change_user_tier_by_address;
|
||||
mod change_user_tier_by_key;
|
||||
@ -68,8 +68,8 @@ pub struct Web3ProxyCli {
|
||||
#[derive(FromArgs, PartialEq, Debug)]
|
||||
#[argh(subcommand)]
|
||||
enum SubCommand {
|
||||
ChangeAdminStatus(change_admin_status::ChangeAdminStatusSubCommand),
|
||||
ChangeUserAddress(change_user_address::ChangeUserAddressSubCommand),
|
||||
ChangeUserAdminStatus(change_user_admin_status::ChangeUserAdminStatusSubCommand),
|
||||
ChangeUserTier(change_user_tier::ChangeUserTierSubCommand),
|
||||
ChangeUserTierByAddress(change_user_tier_by_address::ChangeUserTierByAddressSubCommand),
|
||||
ChangeUserTierByKey(change_user_tier_by_key::ChangeUserTierByKeySubCommand),
|
||||
@ -120,6 +120,10 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
// if RUST_LOG isn't set, configure a default
|
||||
// TODO: is there a better way to do this?
|
||||
#[cfg(tokio_console)]
|
||||
console_subscriber::init();
|
||||
|
||||
#[cfg(not(tokio_console))]
|
||||
let rust_log = match std::env::var("RUST_LOG") {
|
||||
Ok(x) => x,
|
||||
Err(_) => match std::env::var("WEB3_PROXY_TRACE").map(|x| x == "true") {
|
||||
@ -192,35 +196,38 @@ fn main() -> anyhow::Result<()> {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let logger = env_logger::builder().parse_filters(&rust_log).build();
|
||||
#[cfg(not(tokio_console))]
|
||||
{
|
||||
let logger = env_logger::builder().parse_filters(&rust_log).build();
|
||||
|
||||
let max_level = logger.filter();
|
||||
let max_level = logger.filter();
|
||||
|
||||
// connect to sentry for error reporting
|
||||
// if no sentry, only log to stdout
|
||||
let _sentry_guard = if let Some(sentry_url) = cli_config.sentry_url.clone() {
|
||||
let logger = sentry::integrations::log::SentryLogger::with_dest(logger);
|
||||
// connect to sentry for error reporting
|
||||
// if no sentry, only log to stdout
|
||||
let _sentry_guard = if let Some(sentry_url) = cli_config.sentry_url.clone() {
|
||||
let logger = sentry::integrations::log::SentryLogger::with_dest(logger);
|
||||
|
||||
log::set_boxed_logger(Box::new(logger)).unwrap();
|
||||
log::set_boxed_logger(Box::new(logger)).unwrap();
|
||||
|
||||
let guard = sentry::init((
|
||||
sentry_url,
|
||||
sentry::ClientOptions {
|
||||
release: sentry::release_name!(),
|
||||
// TODO: Set this a to lower value (from config) in production
|
||||
traces_sample_rate: 1.0,
|
||||
..Default::default()
|
||||
},
|
||||
));
|
||||
let guard = sentry::init((
|
||||
sentry_url,
|
||||
sentry::ClientOptions {
|
||||
release: sentry::release_name!(),
|
||||
// TODO: Set this a to lower value (from config) in production
|
||||
traces_sample_rate: 1.0,
|
||||
..Default::default()
|
||||
},
|
||||
));
|
||||
|
||||
Some(guard)
|
||||
} else {
|
||||
log::set_boxed_logger(Box::new(logger)).unwrap();
|
||||
Some(guard)
|
||||
} else {
|
||||
log::set_boxed_logger(Box::new(logger)).unwrap();
|
||||
|
||||
None
|
||||
};
|
||||
None
|
||||
};
|
||||
|
||||
log::set_max_level(max_level);
|
||||
log::set_max_level(max_level);
|
||||
}
|
||||
|
||||
info!("{}", APP_USER_AGENT);
|
||||
|
||||
@ -283,19 +290,19 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
rt.block_on(async {
|
||||
match cli_config.sub_command {
|
||||
SubCommand::ChangeUserAddress(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
SubCommand::ChangeAdminStatus(x) => {
|
||||
let db_url = cli_config.db_url.expect(
|
||||
"'--config' (with a db) or '--db-url' is required to run change_admin_status",
|
||||
);
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
x.main(&db_conn).await
|
||||
}
|
||||
SubCommand::ChangeUserAdminStatus(x) => {
|
||||
SubCommand::ChangeUserAddress(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run change_user_admin_status");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run change_user_addres");
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -304,16 +311,16 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::ChangeUserTier(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run change_user_tier");
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
x.main(&db_conn).await
|
||||
}
|
||||
SubCommand::ChangeUserTierByAddress(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run change_user_admin_status");
|
||||
let db_url = cli_config.db_url.expect(
|
||||
"'--config' (with a db) or '--db-url' is required to run change_user_tier_by_address",
|
||||
);
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -322,7 +329,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::ChangeUserTierByKey(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run change_user_tier_by_key");
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -341,7 +348,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::CreateUser(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run create_user");
|
||||
|
||||
let db_conn = get_migrated_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -350,7 +357,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::CountUsers(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run count_users");
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -366,20 +373,25 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::DropMigrationLock(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run drop_migration_lock");
|
||||
|
||||
// very intentionally, do NOT run migrations here
|
||||
// very intentionally, do NOT run migrations here. that would wait forever if the migration lock is abandoned
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
x.main(&db_conn).await
|
||||
}
|
||||
SubCommand::MigrateStatsToV2(x) => {
|
||||
|
||||
let top_config = top_config.expect("--config is required to run the migration from stats-mysql to stats-influx");
|
||||
// let top_config_path =
|
||||
// top_config_path.expect("path must be set if top_config exists");
|
||||
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx");
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
x.main(&db_conn).await
|
||||
x.main(top_config, &db_conn).await
|
||||
}
|
||||
SubCommand::Pagerduty(x) => {
|
||||
if cli_config.sentry_url.is_none() {
|
||||
@ -389,9 +401,7 @@ fn main() -> anyhow::Result<()> {
|
||||
x.main(pagerduty_async, top_config).await
|
||||
}
|
||||
SubCommand::PopularityContest(x) => x.main().await,
|
||||
SubCommand::SearchKafka(x) => {
|
||||
x.main(top_config.unwrap()).await
|
||||
},
|
||||
SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await,
|
||||
SubCommand::Sentryd(x) => {
|
||||
if cli_config.sentry_url.is_none() {
|
||||
warn!("sentry_url is not set! Logs will only show in this console");
|
||||
@ -402,7 +412,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::RpcAccounting(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run rpc_accounting");
|
||||
|
||||
let db_conn = get_migrated_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -411,7 +421,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::TransferKey(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run transfer_key");
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
|
||||
x.main(&db_conn).await
|
||||
@ -419,7 +429,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::UserExport(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run user_export");
|
||||
|
||||
let db_conn = get_migrated_db(db_url, 1, 1).await?;
|
||||
|
||||
@ -428,7 +438,7 @@ fn main() -> anyhow::Result<()> {
|
||||
SubCommand::UserImport(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run proxyd");
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run user_import");
|
||||
|
||||
let db_conn = get_migrated_db(db_url, 1, 1).await?;
|
||||
|
||||
|
@ -1,27 +1,135 @@
|
||||
use anyhow::Context;
|
||||
use argh::FromArgs;
|
||||
use entities::{rpc_accounting, rpc_accounting_v2, user};
|
||||
use chrono::{DateTime, Utc};
|
||||
use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user};
|
||||
use ethers::types::Address;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use log::{debug, info, warn};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use migration::sea_orm::{
|
||||
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
|
||||
QueryFilter, QuerySelect
|
||||
QueryFilter, QuerySelect, UpdateResult,
|
||||
};
|
||||
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
|
||||
use migration::{Expr, Value};
|
||||
use std::mem::swap;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::{sleep, Instant};
|
||||
use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS};
|
||||
use web3_proxy::config::TopConfig;
|
||||
use web3_proxy::frontend::authorization::{
|
||||
Authorization, AuthorizationType, RequestMetadata, RpcSecretKey,
|
||||
};
|
||||
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer};
|
||||
|
||||
// Helper function to go from DateTime to Instant
|
||||
fn datetime_utc_to_instant(datetime: DateTime<Utc>) -> anyhow::Result<Instant> {
|
||||
let epoch = datetime.timestamp(); // Get the Unix timestamp
|
||||
let nanos = datetime.timestamp_subsec_nanos();
|
||||
|
||||
let duration_since_epoch = Duration::new(epoch as u64, nanos);
|
||||
// let duration_since_datetime = Duration::new(, nanos);
|
||||
let instant_new = Instant::now();
|
||||
warn!("Instant new is: {:?}", instant_new);
|
||||
let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||
warn!("Instant since unix epoch is: {:?}", unix_epoch);
|
||||
|
||||
instant_new
|
||||
.checked_sub(unix_epoch)
|
||||
.context("Could not subtract unix epoch from instant now")?
|
||||
.checked_add(duration_since_epoch)
|
||||
.context("Could not add duration since epoch for updated time")
|
||||
}
|
||||
|
||||
/// change a user's address.
|
||||
#[derive(FromArgs, PartialEq, Eq, Debug)]
|
||||
#[argh(subcommand, name = "migrate_stats_to_v2")]
|
||||
pub struct MigrateStatsToV2 {}
|
||||
|
||||
// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think.
|
||||
// Don't make data lossy
|
||||
|
||||
impl MigrateStatsToV2 {
|
||||
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
|
||||
pub async fn main(
|
||||
self,
|
||||
top_config: TopConfig,
|
||||
db_conn: &DatabaseConnection,
|
||||
) -> anyhow::Result<()> {
|
||||
// Also add influxdb container ...
|
||||
// let mut spawned_app =
|
||||
// Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?;
|
||||
|
||||
let number_of_rows_to_process_at_once = 500;
|
||||
|
||||
// we wouldn't really need this, but let's spawn this anyways
|
||||
// easier than debugging the rest I suppose
|
||||
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
|
||||
let rpc_account_shutdown_recevier = app_shutdown_sender.subscribe();
|
||||
|
||||
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
|
||||
let mut important_background_handles = FuturesUnordered::new();
|
||||
|
||||
// Spawn the influxdb
|
||||
let influxdb_client = match top_config.app.influxdb_host.as_ref() {
|
||||
Some(influxdb_host) => {
|
||||
let influxdb_org = top_config
|
||||
.app
|
||||
.influxdb_org
|
||||
.clone()
|
||||
.expect("influxdb_org needed when influxdb_host is set");
|
||||
let influxdb_token = top_config
|
||||
.app
|
||||
.influxdb_token
|
||||
.clone()
|
||||
.expect("influxdb_token needed when influxdb_host is set");
|
||||
|
||||
let influxdb_client =
|
||||
influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token);
|
||||
|
||||
// TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys
|
||||
|
||||
Some(influxdb_client)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Spawn the stat-sender
|
||||
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(
|
||||
top_config.app.chain_id,
|
||||
top_config
|
||||
.app
|
||||
.influxdb_bucket
|
||||
.clone()
|
||||
.context("No influxdb bucket was provided")?
|
||||
.to_owned(),
|
||||
Some(db_conn.clone()),
|
||||
influxdb_client.clone(),
|
||||
30,
|
||||
1,
|
||||
BILLING_PERIOD_SECONDS,
|
||||
rpc_account_shutdown_recevier,
|
||||
)? {
|
||||
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
|
||||
important_background_handles.push(emitter_spawn.background_handle);
|
||||
|
||||
Some(emitter_spawn.stat_sender)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Basically spawn the full app, look at web3_proxy CLI
|
||||
|
||||
while true {
|
||||
|
||||
// (1) Load a batch of rows out of the old table until no more rows are left
|
||||
let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?;
|
||||
let old_records = rpc_accounting::Entity::find()
|
||||
.filter(rpc_accounting::Column::Migrated.is_null())
|
||||
.limit(number_of_rows_to_process_at_once)
|
||||
.all(db_conn)
|
||||
.await?;
|
||||
if old_records.len() == 0 {
|
||||
// Break out of while loop once all records have successfully been migrated ...
|
||||
warn!("All records seem to have been successfully migrated!");
|
||||
@ -29,93 +137,208 @@ impl MigrateStatsToV2 {
|
||||
}
|
||||
|
||||
// (2) Create request metadata objects to match the old data
|
||||
let mut global_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
let mut opt_in_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
|
||||
// Iterate through all old rows, and put them into the above objects.
|
||||
for x in old_records {
|
||||
for x in old_records.iter() {
|
||||
// info!("Preparing for migration: {:?}", x);
|
||||
|
||||
info!("Preparing for migration: {:?}", x);
|
||||
// TODO: Split up a single request into multiple requests ...
|
||||
// according to frontend-requests, backend-requests, etc.
|
||||
|
||||
// // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object
|
||||
// let key = RpcQueryKey {
|
||||
// response_timestamp: x.period_datetime.timestamp(),
|
||||
// archive_needed: x.archive_needed,
|
||||
// error_response: x.error_response,
|
||||
// period_datetime: x.period_datetime.timestamp(),
|
||||
// rpc_secret_key_id: x.rpc_key_id,
|
||||
// origin: x.origin,
|
||||
// method: x.method
|
||||
// };
|
||||
//
|
||||
// // Create the corresponding BufferedRpcQueryStats object
|
||||
// let val = BufferedRpcQueryStats {
|
||||
// frontend_requests: x.frontend_requests,
|
||||
// backend_requests: x.backend_requests,
|
||||
// backend_retries: x.backend_retries,
|
||||
// no_servers: x.no_servers,
|
||||
// cache_misses: x.cache_misses,
|
||||
// cache_hits: x.cache_hits,
|
||||
// sum_request_bytes: x.sum_request_bytes,
|
||||
// sum_response_bytes: x.sum_response_bytes,
|
||||
// sum_response_millis: x.sum_response_millis
|
||||
// };
|
||||
//
|
||||
// // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of!
|
||||
// // We can generate dummy bytes of the same length though, this may work as well
|
||||
//
|
||||
// // TODO: Period datetime is also a question of what it is
|
||||
// // let response_stat = RpcQueryStats::new(
|
||||
// // x.method,
|
||||
// // authorization.clone(),
|
||||
// // request_metadata.clone(),
|
||||
// // response_bytes,
|
||||
// // x.period_datetime
|
||||
// // );
|
||||
//
|
||||
// // BufferedRpcQueryStats
|
||||
// Get the rpc-key from the rpc_key_id
|
||||
// Get the user-id from the rpc_key_id
|
||||
let authorization_checks = match x.rpc_key_id {
|
||||
Some(rpc_key_id) => {
|
||||
let rpc_key_obj = rpc_key::Entity::find()
|
||||
.filter(rpc_key::Column::Id.eq(rpc_key_id))
|
||||
.one(db_conn)
|
||||
.await?
|
||||
.context("Could not find rpc_key_obj for the given rpc_key_id")?;
|
||||
|
||||
// TODO: Create authrization
|
||||
// We can probably also randomly generate this, as we don't care about the user (?)
|
||||
AuthorizationChecks {
|
||||
user_id: rpc_key_obj.user_id,
|
||||
rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)),
|
||||
rpc_secret_key_id: Some(
|
||||
NonZeroU64::new(rpc_key_id)
|
||||
.context("Could not use rpc_key_id to create a u64")?,
|
||||
),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
None => AuthorizationChecks {
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
|
||||
// Then overwrite rpc_key_id and user_id (?)
|
||||
let authorization_type = AuthorizationType::Frontend;
|
||||
let authorization = Arc::new(
|
||||
Authorization::try_new(
|
||||
authorization_checks,
|
||||
None,
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
authorization_type,
|
||||
)
|
||||
.context("Initializing Authorization Struct was not successful")?,
|
||||
);
|
||||
|
||||
// It will be like a fork basically (to simulate getting multiple single requests ...)
|
||||
// Iterate through all frontend requests
|
||||
// For each frontend request, create one object that will be emitted (make sure the timestamp is new)
|
||||
let n = x.frontend_requests;
|
||||
|
||||
for i in 0..n {
|
||||
// info!("Creating a new frontend request");
|
||||
|
||||
// Collect all requests here ...
|
||||
let mut int_request_bytes = (x.sum_request_bytes / n);
|
||||
if i == 0 {
|
||||
int_request_bytes += (x.sum_request_bytes % n);
|
||||
}
|
||||
|
||||
let mut int_response_bytes = (x.sum_response_bytes / n);
|
||||
if i == 0 {
|
||||
int_response_bytes += (x.sum_response_bytes % n);
|
||||
}
|
||||
|
||||
let mut int_response_millis = (x.sum_response_millis / n);
|
||||
if i == 0 {
|
||||
int_response_millis += (x.sum_response_millis % n);
|
||||
}
|
||||
|
||||
let mut int_backend_requests = (x.backend_requests / n);
|
||||
if i == 0 {
|
||||
int_backend_requests += (x.backend_requests % n);
|
||||
}
|
||||
|
||||
// Add module at the last step to include for any remained that we missed ... (?)
|
||||
|
||||
// TODO: Create RequestMetadata
|
||||
let request_metadata = RequestMetadata {
|
||||
start_instant: Instant::now(), // This is overwritten later on
|
||||
request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes
|
||||
archive_request: x.archive_request.into(),
|
||||
backend_requests: Default::default(), // This is not used, instead we modify the field later
|
||||
no_servers: 0.into(), // This is not relevant in the new version
|
||||
error_response: x.error_response.into(),
|
||||
response_bytes: int_response_bytes.into(),
|
||||
response_millis: int_response_millis.into(),
|
||||
// We just don't have this data
|
||||
response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default()
|
||||
};
|
||||
|
||||
// (3) Send through a channel to a stat emitter
|
||||
// Send it to the stats sender
|
||||
if let Some(stat_sender_ref) = stat_sender.as_ref() {
|
||||
// info!("Method is: {:?}", x.clone().method);
|
||||
let mut response_stat = RpcQueryStats::new(
|
||||
x.clone().method,
|
||||
authorization.clone(),
|
||||
Arc::new(request_metadata),
|
||||
(int_response_bytes)
|
||||
.try_into()
|
||||
.context("sum bytes average is not calculated properly")?,
|
||||
);
|
||||
// Modify the timestamps ..
|
||||
response_stat.modify_struct(
|
||||
int_response_millis,
|
||||
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database
|
||||
int_backend_requests,
|
||||
);
|
||||
// info!("Sending stats: {:?}", response_stat);
|
||||
stat_sender_ref
|
||||
// .send(response_stat.into())
|
||||
.send_async(response_stat.into())
|
||||
.await
|
||||
.context("stat_sender sending response_stat")?;
|
||||
// info!("Send! {:?}", stat_sender);
|
||||
} else {
|
||||
panic!("Stat sender was not spawned!");
|
||||
}
|
||||
|
||||
// Create a new stats object
|
||||
// Add it to the StatBuffer
|
||||
}
|
||||
|
||||
// Let the stat_sender spawn / flush ...
|
||||
// spawned_app.app.stat_sender.aggregate_and_save_loop()
|
||||
// Send a signal to save ...
|
||||
}
|
||||
|
||||
// (3) Await that all items are properly processed
|
||||
// TODO: Await all the background handles
|
||||
|
||||
// Only after this mark all the items as processed / completed
|
||||
|
||||
// If the items are in rpc_v2, delete the initial items from the database
|
||||
|
||||
// return Ok(());
|
||||
|
||||
// (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated)
|
||||
let old_record_ids = old_records.iter().map(|x| x.id);
|
||||
let update_result: UpdateResult = rpc_accounting::Entity::update_many()
|
||||
.col_expr(
|
||||
rpc_accounting::Column::Migrated,
|
||||
Expr::value(Value::ChronoDateTimeUtc(Some(Box::new(
|
||||
chrono::offset::Utc::now(),
|
||||
)))),
|
||||
)
|
||||
.filter(rpc_accounting::Column::Id.is_in(old_record_ids))
|
||||
// .set(pear)
|
||||
.exec(db_conn)
|
||||
.await?;
|
||||
|
||||
info!("Update result is: {:?}", update_result);
|
||||
|
||||
// (N-1) Mark the batch as migrated
|
||||
// break;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Background handles (2) are: {:?}",
|
||||
important_background_handles
|
||||
);
|
||||
|
||||
// Drop the handle
|
||||
// Send the shutdown signal here (?)
|
||||
// important_background_handles.clear();
|
||||
|
||||
// Finally also send a shutdown signal
|
||||
|
||||
drop(stat_sender);
|
||||
// match app_shutdown_sender.send(()) {
|
||||
// Err(x) => {
|
||||
// panic!("Could not send shutdown signal! {:?}", x);
|
||||
// }
|
||||
// _ => {}
|
||||
// };
|
||||
|
||||
// (3) Update the batch in the old table with the current timestamp
|
||||
// TODO: Should we also write a short verifier if the migration was successful (?)
|
||||
|
||||
// (4) Send through a channel to a stat emitter
|
||||
// Drop the background handle, wait for any tasks that are on-going
|
||||
while let Some(x) = important_background_handles.next().await {
|
||||
info!("Returned item is: {:?}", x);
|
||||
match x {
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!("{:?}", e);
|
||||
}
|
||||
Ok(Ok(_)) => {
|
||||
// TODO: how can we know which handle exited?
|
||||
info!("a background handle exited");
|
||||
// Pop it in this case?
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// let old_address: Address = self.old_address.parse()?;
|
||||
// let new_address: Address = self.new_address.parse()?;
|
||||
//
|
||||
// let old_address: Vec<u8> = old_address.to_fixed_bytes().into();
|
||||
// let new_address: Vec<u8> = new_address.to_fixed_bytes().into();
|
||||
//
|
||||
// let u = user::Entity::find()
|
||||
// .filter(user::Column::Address.eq(old_address))
|
||||
// .one(db_conn)
|
||||
// .await?
|
||||
// .context("No user found with that address")?;
|
||||
//
|
||||
// debug!("initial user: {:#?}", u);
|
||||
//
|
||||
// if u.address == new_address {
|
||||
// info!("user already has this address");
|
||||
// } else {
|
||||
// let mut u = u.into_active_model();
|
||||
//
|
||||
// u.address = sea_orm::Set(new_address);
|
||||
//
|
||||
// let u = u.save(db_conn).await?;
|
||||
//
|
||||
// info!("changed user address");
|
||||
//
|
||||
// debug!("updated user: {:#?}", u);
|
||||
// }
|
||||
// info!("Here (?)");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -178,7 +178,9 @@ pub async fn block_needed(
|
||||
// TODO: think about this more
|
||||
// TODO: jsonrpc has a specific code for this
|
||||
// TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch
|
||||
let obj = params[0]
|
||||
let obj = params
|
||||
.get_mut(0)
|
||||
.ok_or_else(|| anyhow::anyhow!("invalid format. no params"))?
|
||||
.as_object_mut()
|
||||
.ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?;
|
||||
|
||||
|
@ -17,10 +17,10 @@ use axum_client_ip::InsecureClientIp;
|
||||
use axum_macros::debug_handler;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use entities::{admin_trail, login, pending_login, rpc_key, user};
|
||||
use ethers::{abi::AbiEncode, prelude::Address, types::Bytes};
|
||||
use ethers::{prelude::Address, types::Bytes};
|
||||
use hashbrown::HashMap;
|
||||
use http::StatusCode;
|
||||
use log::{debug, warn};
|
||||
use log::{debug, info, warn};
|
||||
use migration::sea_orm::prelude::Uuid;
|
||||
use migration::sea_orm::{
|
||||
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
|
||||
@ -136,6 +136,8 @@ pub async fn admin_login_get(
|
||||
resources: vec![],
|
||||
};
|
||||
|
||||
let admin_address: Vec<u8> = admin_address.to_fixed_bytes().into();
|
||||
|
||||
let db_conn = app.db_conn().web3_context("login requires a database")?;
|
||||
let db_replica = app
|
||||
.db_replica()
|
||||
@ -151,8 +153,11 @@ pub async fn admin_login_get(
|
||||
"Could not find user in db".to_string(),
|
||||
))?;
|
||||
|
||||
// TODO: Gotta check if encoding messes up things maybe ...
|
||||
info!("Admin address is: {:?}", admin_address);
|
||||
info!("Encoded admin address is: {:?}", admin_address);
|
||||
let admin = user::Entity::find()
|
||||
.filter(user::Column::Address.eq(admin_address.encode()))
|
||||
.filter(user::Column::Address.eq(admin_address))
|
||||
.one(db_replica.conn())
|
||||
.await?
|
||||
.ok_or(Web3ProxyError::BadRequest(
|
||||
|
@ -65,9 +65,13 @@ pub enum Web3ProxyError {
|
||||
#[from(ignore)]
|
||||
IpNotAllowed(IpAddr),
|
||||
JoinError(JoinError),
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
#[error(ignore)]
|
||||
JsonRpc(crate::jsonrpc::JsonRpcErrorData),
|
||||
MsgPackEncode(rmp_serde::encode::Error),
|
||||
NoBlockNumberOrHash,
|
||||
NoBlocksKnown,
|
||||
NoConsensusHeadBlock,
|
||||
NoHandleReady,
|
||||
NoServersSynced,
|
||||
#[display(fmt = "{}/{}", num_known, min_head_rpcs)]
|
||||
@ -409,6 +413,17 @@ impl Web3ProxyError {
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::JsonRpc(err) => {
|
||||
debug!("JsonRpc err={:?}", err);
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"json rpc error!",
|
||||
Some(StatusCode::BAD_REQUEST.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::MsgPackEncode(err) => {
|
||||
debug!("MsgPackEncode Error: {}", err);
|
||||
(
|
||||
@ -442,6 +457,17 @@ impl Web3ProxyError {
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::NoConsensusHeadBlock => {
|
||||
error!("NoConsensusHeadBlock");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"no consensus head block",
|
||||
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::NoHandleReady => {
|
||||
error!("NoHandleReady");
|
||||
(
|
||||
|
@ -238,13 +238,18 @@ pub async fn serve(
|
||||
let server = axum::Server::bind(&addr)
|
||||
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
|
||||
.serve(service)
|
||||
.with_graceful_shutdown(async move {
|
||||
let _ = shutdown_receiver.recv().await;
|
||||
})
|
||||
.await
|
||||
.map_err(Into::into);
|
||||
// <<<<<<< HEAD
|
||||
// .with_graceful_shutdown(async move {
|
||||
// let _ = shutdown_receiver.recv().await;
|
||||
// })
|
||||
// .await
|
||||
// .map_err(Into::into);
|
||||
//
|
||||
// let _ = shutdown_complete_sender.send(());
|
||||
//
|
||||
// server
|
||||
// =======
|
||||
.await?;
|
||||
|
||||
let _ = shutdown_complete_sender.send(());
|
||||
|
||||
server
|
||||
Ok(())
|
||||
}
|
||||
|
@ -99,15 +99,6 @@ async fn _proxy_web3_rpc(
|
||||
.expect("W3P-BACKEND-RPCS should always parse"),
|
||||
);
|
||||
|
||||
// TODO: add a header if a backend rpc was used
|
||||
|
||||
headers.insert(
|
||||
"X-W3P-CLIENT-IP",
|
||||
ip.to_string()
|
||||
.parse()
|
||||
.expect("X-CLIENT-IP should always parse"),
|
||||
);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
@ -272,13 +263,6 @@ async fn _proxy_web3_rpc_with_key(
|
||||
.expect("W3P-BACKEND-RPCS should always parse"),
|
||||
);
|
||||
|
||||
headers.insert(
|
||||
"X-W3P-CLIENT-IP",
|
||||
ip.to_string()
|
||||
.parse()
|
||||
.expect("X-CLIENT-IP should always parse"),
|
||||
);
|
||||
|
||||
if let Some(rpc_secret_key_id) = rpc_secret_key_id {
|
||||
headers.insert(
|
||||
"X-W3P-KEY-ID",
|
||||
|
@ -397,7 +397,7 @@ async fn handle_socket_payload(
|
||||
|
||||
if let Some(stat_sender) = app.stat_sender.as_ref() {
|
||||
let response_stat = RpcQueryStats::new(
|
||||
json_request.method.clone(),
|
||||
Some(json_request.method.clone()),
|
||||
authorization.clone(),
|
||||
request_metadata,
|
||||
response.num_bytes(),
|
||||
|
@ -7,6 +7,8 @@ use super::{FrontendHealthCache, FrontendJsonResponseCache, FrontendResponseCach
|
||||
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
|
||||
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
||||
use axum_macros::debug_handler;
|
||||
use hashbrown::HashMap;
|
||||
use http::HeaderMap;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -38,11 +40,13 @@ pub async fn status(
|
||||
let body = response_cache
|
||||
.get_with(FrontendResponseCaches::Status, async {
|
||||
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
|
||||
// TODO: the hostname is probably not going to change. only get once at the start?
|
||||
let body = json!({
|
||||
"version": APP_USER_AGENT,
|
||||
"chain_id": app.config.chain_id,
|
||||
"balanced_rpcs": app.balanced_rpcs,
|
||||
"private_rpcs": app.private_rpcs,
|
||||
"hostname": app.hostname,
|
||||
});
|
||||
|
||||
Arc::new(body)
|
||||
|
@ -281,7 +281,7 @@ impl JsonRpcForwardedResponse {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unimplemented!();
|
||||
return Err(anyhow::anyhow!("unexpected ethers error!").into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use log::{debug, error, warn};
|
||||
use pagerduty_rs::eventsv2sync::EventsV2 as PagerdutySyncEventsV2;
|
||||
use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload, Event};
|
||||
use serde::Serialize;
|
||||
use std::backtrace::Backtrace;
|
||||
use std::{
|
||||
collections::hash_map::DefaultHasher,
|
||||
hash::{Hash, Hasher},
|
||||
@ -71,7 +72,10 @@ pub fn panic_handler(
|
||||
) {
|
||||
let summary = format!("{}", panic_info);
|
||||
|
||||
let details = format!("{:#?}", panic_info);
|
||||
let backtrace = Backtrace::force_capture();
|
||||
|
||||
// TODO: try to send to sentry and then put the sentry link into the page
|
||||
let details = format!("{:#?}\n{:#?}", panic_info, backtrace);
|
||||
|
||||
if summary.starts_with("panicked at 'WS Server panic") {
|
||||
// the ethers-rs library panics when websockets disconnect. this isn't a panic we care about reporting
|
||||
|
@ -13,6 +13,8 @@ use moka::future::Cache;
|
||||
use serde::ser::SerializeStruct;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::hash::Hash;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::Duration;
|
||||
@ -65,6 +67,14 @@ impl PartialEq for Web3ProxyBlock {
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Web3ProxyBlock {}
|
||||
|
||||
impl Hash for Web3ProxyBlock {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
self.block.hash.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3ProxyBlock {
|
||||
/// A new block has arrived over a subscription
|
||||
pub fn try_new(block: ArcBlock) -> Option<Self> {
|
||||
@ -244,7 +254,13 @@ impl Web3Rpcs {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let block = response.result.web3_context("failed fetching block")?;
|
||||
if let Some(err) = response.error {
|
||||
return Err(err).web3_context("failed fetching block");
|
||||
}
|
||||
|
||||
let block = response
|
||||
.result
|
||||
.web3_context("no error, but also no block")?;
|
||||
|
||||
let block: Option<ArcBlock> = serde_json::from_str(block.get())?;
|
||||
|
||||
@ -378,7 +394,10 @@ impl Web3Rpcs {
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("unable to process block from rpc {}: {:#?}", rpc_name, err);
|
||||
warn!(
|
||||
"error while processing block from rpc {}: {:#?}",
|
||||
rpc_name, err
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@ -410,41 +429,32 @@ impl Web3Rpcs {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let new_consensus = consensus_finder
|
||||
.best_consensus_connections(authorization, self)
|
||||
let new_synced_connections = match consensus_finder
|
||||
.find_consensus_connections(authorization, self)
|
||||
.await
|
||||
.web3_context("no consensus head block!")
|
||||
.map_err(|err| {
|
||||
self.watch_consensus_rpcs_sender.send_replace(None);
|
||||
|
||||
err
|
||||
})?;
|
||||
|
||||
// TODO: what should we do if the block number of new_consensus is < old_synced_connections? wait?
|
||||
{
|
||||
Err(err) => {
|
||||
return Err(err).web3_context("error while finding consensus head block!");
|
||||
}
|
||||
Ok(None) => {
|
||||
return Err(Web3ProxyError::NoConsensusHeadBlock);
|
||||
}
|
||||
Ok(Some(x)) => x,
|
||||
};
|
||||
|
||||
let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap();
|
||||
let consensus_tier = new_consensus.tier;
|
||||
let total_tiers = consensus_finder.len();
|
||||
let backups_needed = new_consensus.backups_needed;
|
||||
let consensus_head_block = new_consensus.head_block.clone();
|
||||
let num_consensus_rpcs = new_consensus.num_conns();
|
||||
let mut num_synced_rpcs = 0;
|
||||
let num_active_rpcs = consensus_finder
|
||||
.all_rpcs_group()
|
||||
.map(|x| {
|
||||
for v in x.rpc_to_block.values() {
|
||||
if *v == consensus_head_block {
|
||||
num_synced_rpcs += 1;
|
||||
}
|
||||
}
|
||||
x.len()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let consensus_tier = new_synced_connections.tier;
|
||||
// TODO: think more about this unwrap
|
||||
let total_tiers = consensus_finder.worst_tier().unwrap_or(10);
|
||||
let backups_needed = new_synced_connections.backups_needed;
|
||||
let consensus_head_block = new_synced_connections.head_block.clone();
|
||||
let num_consensus_rpcs = new_synced_connections.num_conns();
|
||||
let num_active_rpcs = consensus_finder.len();
|
||||
let total_rpcs = self.by_name.read().len();
|
||||
|
||||
let old_consensus_head_connections = self
|
||||
.watch_consensus_rpcs_sender
|
||||
.send_replace(Some(Arc::new(new_consensus)));
|
||||
.send_replace(Some(Arc::new(new_synced_connections)));
|
||||
|
||||
let backups_voted_str = if backups_needed { "B " } else { "" };
|
||||
|
||||
@ -492,12 +502,11 @@ impl Web3Rpcs {
|
||||
// no change in hash. no need to use watch_consensus_head_sender
|
||||
// TODO: trace level if rpc is backup
|
||||
debug!(
|
||||
"con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -506,18 +515,13 @@ impl Web3Rpcs {
|
||||
)
|
||||
} else {
|
||||
// hash changed
|
||||
if backups_needed {
|
||||
// TODO: what else should be in this error?
|
||||
warn!("Backup RPCs are in use!");
|
||||
}
|
||||
|
||||
debug!(
|
||||
"unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -541,12 +545,11 @@ impl Web3Rpcs {
|
||||
// this is unlikely but possible
|
||||
// TODO: better log
|
||||
warn!(
|
||||
"chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}",
|
||||
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -575,12 +578,11 @@ impl Web3Rpcs {
|
||||
}
|
||||
Ordering::Greater => {
|
||||
debug!(
|
||||
"new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
|
@ -5,10 +5,11 @@ use crate::frontend::authorization::Authorization;
|
||||
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
|
||||
use ethers::prelude::{H256, U64};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use itertools::{Itertools, MinMaxResult};
|
||||
use log::{debug, trace, warn};
|
||||
use moka::future::Cache;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::cmp::Reverse;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Instant;
|
||||
@ -17,27 +18,18 @@ use tokio::time::Instant;
|
||||
/// Serialize is so we can print it on our debug endpoint
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct ConsensusWeb3Rpcs {
|
||||
// TODO: tier should be an option, or we should have consensus be stored as an Option<ConsensusWeb3Rpcs>
|
||||
pub(super) tier: u64,
|
||||
pub(super) head_block: Web3ProxyBlock,
|
||||
// pub tier: u64,
|
||||
// pub head_block: Option<Web3ProxyBlock>,
|
||||
// TODO: this should be able to serialize, but it isn't
|
||||
#[serde(skip_serializing)]
|
||||
pub rpcs: Vec<Arc<Web3Rpc>>,
|
||||
pub backups_voted: Option<Web3ProxyBlock>,
|
||||
pub backups_needed: bool,
|
||||
pub(super) best_rpcs: Vec<Arc<Web3Rpc>>,
|
||||
// TODO: functions like "compare_backup_vote()"
|
||||
// pub(super) backups_voted: Option<Web3ProxyBlock>,
|
||||
pub(super) backups_needed: bool,
|
||||
}
|
||||
|
||||
impl ConsensusWeb3Rpcs {
|
||||
#[inline(always)]
|
||||
pub fn num_conns(&self) -> usize {
|
||||
self.rpcs.len()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn sum_soft_limit(&self) -> u32 {
|
||||
self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
|
||||
self.best_rpcs.len()
|
||||
}
|
||||
|
||||
// TODO: sum_hard_limit?
|
||||
@ -49,7 +41,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
|
||||
// TODO: print the actual conns?
|
||||
f.debug_struct("ConsensusWeb3Rpcs")
|
||||
.field("head_block", &self.head_block)
|
||||
.field("num_rpcs", &self.rpcs.len())
|
||||
.field("num_conns", &self.best_rpcs.len())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@ -76,7 +68,7 @@ impl Web3Rpcs {
|
||||
let consensus = self.watch_consensus_rpcs_sender.borrow();
|
||||
|
||||
if let Some(consensus) = consensus.as_ref() {
|
||||
!consensus.rpcs.is_empty()
|
||||
!consensus.best_rpcs.is_empty()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
@ -86,7 +78,7 @@ impl Web3Rpcs {
|
||||
let consensus = self.watch_consensus_rpcs_sender.borrow();
|
||||
|
||||
if let Some(consensus) = consensus.as_ref() {
|
||||
consensus.rpcs.len()
|
||||
consensus.best_rpcs.len()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
@ -95,50 +87,51 @@ impl Web3Rpcs {
|
||||
|
||||
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
|
||||
|
||||
pub struct ConnectionsGroup {
|
||||
pub rpc_to_block: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
|
||||
// TODO: what if there are two blocks with the same number?
|
||||
pub highest_block: Option<Web3ProxyBlock>,
|
||||
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
|
||||
pub struct ConsensusFinder {
|
||||
/// backups for all tiers are only used if necessary
|
||||
/// tiers[0] = only tier 0.
|
||||
/// tiers[1] = tier 0 and tier 1
|
||||
/// tiers[n] = tier 0..=n
|
||||
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
|
||||
rpc_heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
|
||||
/// never serve blocks that are too old
|
||||
max_block_age: Option<u64>,
|
||||
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
|
||||
max_block_lag: Option<U64>,
|
||||
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
|
||||
first_seen: FirstSeenCache,
|
||||
}
|
||||
|
||||
impl ConnectionsGroup {
|
||||
pub fn new(first_seen: FirstSeenCache) -> Self {
|
||||
impl ConsensusFinder {
|
||||
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
|
||||
// TODO: what's a good capacity for this? it shouldn't need to be very large
|
||||
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
|
||||
let first_seen = Cache::builder()
|
||||
.max_capacity(16)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
|
||||
|
||||
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
|
||||
let rpc_heads = HashMap::new();
|
||||
|
||||
Self {
|
||||
rpc_to_block: Default::default(),
|
||||
highest_block: Default::default(),
|
||||
rpc_heads,
|
||||
max_block_age,
|
||||
max_block_lag,
|
||||
first_seen,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.rpc_to_block.len()
|
||||
self.rpc_heads.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.rpc_to_block.is_empty()
|
||||
self.rpc_heads.is_empty()
|
||||
}
|
||||
|
||||
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
|
||||
if let Some(removed_block) = self.rpc_to_block.remove(rpc) {
|
||||
match self.highest_block.as_mut() {
|
||||
None => {}
|
||||
Some(current_highest_block) => {
|
||||
if removed_block.hash() == current_highest_block.hash() {
|
||||
for maybe_highest_block in self.rpc_to_block.values() {
|
||||
if maybe_highest_block.number() > current_highest_block.number() {
|
||||
*current_highest_block = maybe_highest_block.clone();
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(removed_block)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
self.rpc_heads.remove(rpc)
|
||||
}
|
||||
|
||||
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
@ -153,306 +146,7 @@ impl ConnectionsGroup {
|
||||
|
||||
rpc.head_latency.write().record(latency);
|
||||
|
||||
// TODO: what about a reorg to the same height?
|
||||
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
|
||||
self.highest_block = Some(block.clone());
|
||||
}
|
||||
|
||||
self.rpc_to_block.insert(rpc, block)
|
||||
}
|
||||
|
||||
/// min_consensus_block_num keeps us from ever going backwards.
|
||||
/// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data.
|
||||
pub(self) async fn consensus_head_connections(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
min_consensus_block_num: Option<U64>,
|
||||
tier: &u64,
|
||||
) -> Web3ProxyResult<ConsensusWeb3Rpcs> {
|
||||
let mut maybe_head_block = match self.highest_block.clone() {
|
||||
None => return Err(Web3ProxyError::NoBlocksKnown),
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
// TODO: take max_distance_consensus_to_highest as an argument?
|
||||
// TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain?
|
||||
let max_lag_consensus_to_highest =
|
||||
if let Some(min_consensus_block_num) = min_consensus_block_num {
|
||||
maybe_head_block
|
||||
.number()
|
||||
.saturating_add(1.into())
|
||||
.saturating_sub(min_consensus_block_num)
|
||||
.as_u64()
|
||||
} else {
|
||||
10
|
||||
};
|
||||
|
||||
trace!(
|
||||
"max_lag_consensus_to_highest: {}",
|
||||
max_lag_consensus_to_highest
|
||||
);
|
||||
|
||||
let num_known = self.rpc_to_block.len();
|
||||
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
return Err(Web3ProxyError::NotEnoughRpcs {
|
||||
num_known,
|
||||
min_head_rpcs: web3_rpcs.min_head_rpcs,
|
||||
});
|
||||
}
|
||||
|
||||
let mut primary_rpcs_voted: Option<Web3ProxyBlock> = None;
|
||||
let mut backup_rpcs_voted: Option<Web3ProxyBlock> = None;
|
||||
|
||||
// track rpcs on this heaviest chain so we can build a new ConsensusWeb3Rpcs
|
||||
let mut primary_consensus_rpcs = HashSet::<&str>::new();
|
||||
let mut backup_consensus_rpcs = HashSet::<&str>::new();
|
||||
|
||||
// a running total of the soft limits covered by the rpcs that agree on the head block
|
||||
let mut primary_sum_soft_limit: u32 = 0;
|
||||
let mut backup_sum_soft_limit: u32 = 0;
|
||||
|
||||
// TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit?
|
||||
|
||||
// check the highest work block for a set of rpcs that can serve our request load
|
||||
// if it doesn't have enough rpcs for our request load, check the parent block
|
||||
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
|
||||
// TODO: this loop is pretty long. any way to clean up this code?
|
||||
for _ in 0..max_lag_consensus_to_highest {
|
||||
let maybe_head_hash = maybe_head_block.hash();
|
||||
|
||||
// find all rpcs with maybe_head_hash as their current head
|
||||
for (rpc, rpc_head) in self.rpc_to_block.iter() {
|
||||
if rpc_head.hash() != maybe_head_hash {
|
||||
// connection is not on the desired block
|
||||
continue;
|
||||
}
|
||||
let rpc_name = rpc.name.as_str();
|
||||
if backup_consensus_rpcs.contains(rpc_name) {
|
||||
// connection is on a later block in this same chain
|
||||
continue;
|
||||
}
|
||||
if primary_consensus_rpcs.contains(rpc_name) {
|
||||
// connection is on a later block in this same chain
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) {
|
||||
if backup_rpcs_voted.is_some() {
|
||||
// backups already voted for a head block. don't change it
|
||||
} else {
|
||||
backup_consensus_rpcs.insert(rpc_name);
|
||||
backup_sum_soft_limit += rpc.soft_limit;
|
||||
}
|
||||
if !rpc.backup {
|
||||
primary_consensus_rpcs.insert(rpc_name);
|
||||
primary_sum_soft_limit += rpc.soft_limit;
|
||||
}
|
||||
} else {
|
||||
// i don't think this is an error. i think its just if a reconnect is currently happening
|
||||
if web3_rpcs.synced() {
|
||||
warn!("connection missing: {}", rpc_name);
|
||||
debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name);
|
||||
} else {
|
||||
return Err(Web3ProxyError::NoServersSynced);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// we have enough servers with enough requests! yey!
|
||||
primary_rpcs_voted = Some(maybe_head_block.clone());
|
||||
break;
|
||||
}
|
||||
|
||||
if backup_rpcs_voted.is_none()
|
||||
&& backup_consensus_rpcs != primary_consensus_rpcs
|
||||
&& backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// if we include backup servers, we have enough servers with high enough limits
|
||||
backup_rpcs_voted = Some(maybe_head_block.clone());
|
||||
}
|
||||
|
||||
// not enough rpcs on this block. check the parent block
|
||||
match web3_rpcs
|
||||
.block(authorization, maybe_head_block.parent_hash(), None)
|
||||
.await
|
||||
{
|
||||
Ok(parent_block) => {
|
||||
// trace!(
|
||||
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block",
|
||||
// );
|
||||
maybe_head_block = parent_block;
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
let soft_limit_percent = (primary_sum_soft_limit as f32
|
||||
/ web3_rpcs.min_sum_soft_limit as f32)
|
||||
* 100.0;
|
||||
|
||||
let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}",
|
||||
primary_consensus_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
primary_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
err,
|
||||
);
|
||||
|
||||
if backup_rpcs_voted.is_some() {
|
||||
warn!("{}", err_msg);
|
||||
break;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!(err_msg).into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
|
||||
|
||||
// we've done all the searching for the heaviest block that we can
|
||||
if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs
|
||||
|| primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit)
|
||||
&& backup_rpcs_voted.is_none()
|
||||
{
|
||||
// if we get here, not enough servers are synced. return an error
|
||||
let soft_limit_percent =
|
||||
(primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
|
||||
primary_consensus_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
primary_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
// success! this block has enough soft limit and nodes on it (or on later blocks)
|
||||
let rpcs: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
|
||||
.into_iter()
|
||||
.filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned())
|
||||
.collect();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let _ = maybe_head_block.hash();
|
||||
let _ = maybe_head_block.number();
|
||||
}
|
||||
|
||||
Ok(ConsensusWeb3Rpcs {
|
||||
tier: *tier,
|
||||
head_block: maybe_head_block,
|
||||
rpcs,
|
||||
backups_voted: backup_rpcs_voted,
|
||||
backups_needed: primary_rpcs_voted.is_none(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A ConsensusWeb3Rpcs builder that tracks all connection heads across multiple groups of servers
|
||||
pub struct ConsensusFinder {
|
||||
/// backups for all tiers are only used if necessary
|
||||
/// tiers[0] = only tier 0.
|
||||
/// tiers[1] = tier 0 and tier 1
|
||||
/// tiers[n] = tier 0..=n
|
||||
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
|
||||
tiers: BTreeMap<u64, ConnectionsGroup>,
|
||||
/// never serve blocks that are too old
|
||||
max_block_age: Option<u64>,
|
||||
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
|
||||
max_block_lag: Option<U64>,
|
||||
}
|
||||
|
||||
impl ConsensusFinder {
|
||||
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
|
||||
// TODO: what's a good capacity for this? it shouldn't need to be very large
|
||||
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
|
||||
let first_seen = Cache::builder()
|
||||
.max_capacity(16)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
|
||||
|
||||
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
|
||||
let tiers = (0..10)
|
||||
.map(|x| (x, ConnectionsGroup::new(first_seen.clone())))
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
tiers,
|
||||
max_block_age,
|
||||
max_block_lag,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.tiers.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.tiers.is_empty()
|
||||
}
|
||||
|
||||
/// get the ConnectionsGroup that contains all rpcs
|
||||
/// panics if there are no tiers
|
||||
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
|
||||
self.tiers.values().last()
|
||||
}
|
||||
|
||||
/// get the mutable ConnectionsGroup that contains all rpcs
|
||||
pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> {
|
||||
self.tiers.values_mut().last()
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
|
||||
let mut removed = None;
|
||||
|
||||
for (i, tier_group) in self.tiers.iter_mut().rev() {
|
||||
if i < &rpc.tier {
|
||||
break;
|
||||
}
|
||||
let x = tier_group.remove(rpc);
|
||||
|
||||
if removed.is_none() && x.is_some() {
|
||||
removed = x;
|
||||
}
|
||||
}
|
||||
|
||||
removed
|
||||
}
|
||||
|
||||
/// returns the block that the rpc was on before updating to the new_block
|
||||
pub async fn insert(
|
||||
&mut self,
|
||||
rpc: &Arc<Web3Rpc>,
|
||||
new_block: Web3ProxyBlock,
|
||||
) -> Option<Web3ProxyBlock> {
|
||||
let mut old = None;
|
||||
|
||||
// TODO: error if rpc.tier is not in self.tiers
|
||||
|
||||
for (i, tier_group) in self.tiers.iter_mut().rev() {
|
||||
if i < &rpc.tier {
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: should new_block be a ref?
|
||||
let x = tier_group.insert(rpc.clone(), new_block.clone()).await;
|
||||
|
||||
if old.is_none() && x.is_some() {
|
||||
old = x;
|
||||
}
|
||||
}
|
||||
|
||||
old
|
||||
self.rpc_heads.insert(rpc, block)
|
||||
}
|
||||
|
||||
/// Update our tracking of the rpc and return true if something changed
|
||||
@ -486,8 +180,8 @@ impl ConsensusFinder {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await {
|
||||
// false if this block was already sent by this rpc. return early
|
||||
if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await {
|
||||
// false if this block was already sent by this rpc
|
||||
// true if new block for this rpc
|
||||
prev_block.hash() != rpc_head_block.hash()
|
||||
} else {
|
||||
@ -505,47 +199,183 @@ impl ConsensusFinder {
|
||||
Ok(changed)
|
||||
}
|
||||
|
||||
pub async fn best_consensus_connections(
|
||||
pub async fn find_consensus_connections(
|
||||
&mut self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_connections: &Web3Rpcs,
|
||||
) -> Web3ProxyResult<ConsensusWeb3Rpcs> {
|
||||
// TODO: attach context to these?
|
||||
let highest_known_block = self
|
||||
.all_rpcs_group()
|
||||
.web3_context("no rpcs")?
|
||||
.highest_block
|
||||
.as_ref()
|
||||
.web3_context("no highest block")?;
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
) -> Web3ProxyResult<Option<ConsensusWeb3Rpcs>> {
|
||||
let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number());
|
||||
|
||||
trace!("highest_known_block: {}", highest_known_block);
|
||||
let (lowest_block, highest_block) = match minmax_block {
|
||||
MinMaxResult::NoElements => return Ok(None),
|
||||
MinMaxResult::OneElement(x) => (x, x),
|
||||
MinMaxResult::MinMax(min, max) => (min, max),
|
||||
};
|
||||
|
||||
let min_block_num = self
|
||||
.max_block_lag
|
||||
.map(|x| highest_known_block.number().saturating_sub(x))
|
||||
// we also want to be sure we don't ever go backwards!
|
||||
.max(web3_connections.head_block_num());
|
||||
let highest_block_number = highest_block.number();
|
||||
|
||||
trace!("min_block_num: {:#?}", min_block_num);
|
||||
trace!("highest_block_number: {}", highest_block_number);
|
||||
|
||||
// TODO Should this be a Vec<Result<Option<_, _>>>?
|
||||
// TODO: how should errors be handled?
|
||||
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
|
||||
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
|
||||
for (tier, x) in self.tiers.iter() {
|
||||
trace!("checking tier {}: {:#?}", tier, x.rpc_to_block);
|
||||
if let Ok(consensus_head_connections) = x
|
||||
.consensus_head_connections(authorization, web3_connections, min_block_num, tier)
|
||||
.await
|
||||
{
|
||||
trace!("success on tier {}", tier);
|
||||
// we got one! hopefully it didn't need to use any backups.
|
||||
// but even if it did need backup servers, that is better than going to a worse tier
|
||||
return Ok(consensus_head_connections);
|
||||
trace!("lowest_block_number: {}", lowest_block.number());
|
||||
|
||||
let max_lag_block_number = highest_block_number
|
||||
.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10)));
|
||||
|
||||
trace!("max_lag_block_number: {}", max_lag_block_number);
|
||||
|
||||
let lowest_block_number = lowest_block.number().max(&max_lag_block_number);
|
||||
|
||||
trace!("safe lowest_block_number: {}", lowest_block_number);
|
||||
|
||||
let num_known = self.rpc_heads.len();
|
||||
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
// this keeps us from serving requests when the proxy first starts
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
|
||||
// TODO: struct for the value of the votes hashmap?
|
||||
let mut primary_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
|
||||
let mut backup_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
|
||||
|
||||
let mut backup_consensus = None;
|
||||
|
||||
let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect();
|
||||
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier);
|
||||
|
||||
let current_tier = rpc_heads_by_tier
|
||||
.first()
|
||||
.expect("rpc_heads_by_tier should never be empty")
|
||||
.0
|
||||
.tier;
|
||||
|
||||
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
|
||||
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement
|
||||
for (rpc, rpc_head) in self.rpc_heads.iter() {
|
||||
if current_tier != rpc.tier {
|
||||
// we finished processing a tier. check for primary results
|
||||
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
|
||||
return Ok(Some(consensus));
|
||||
}
|
||||
|
||||
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
|
||||
if backup_consensus.is_none() {
|
||||
if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) {
|
||||
backup_consensus = Some(consensus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut block_to_check = rpc_head.clone();
|
||||
|
||||
while block_to_check.number() >= lowest_block_number {
|
||||
if !rpc.backup {
|
||||
// backup nodes are excluded from the primary voting
|
||||
let entry = primary_votes.entry(block_to_check.clone()).or_default();
|
||||
|
||||
entry.0.insert(&rpc.name);
|
||||
entry.1 += rpc.soft_limit;
|
||||
}
|
||||
|
||||
// both primary and backup rpcs get included in the backup voting
|
||||
let backup_entry = backup_votes.entry(block_to_check.clone()).or_default();
|
||||
|
||||
backup_entry.0.insert(&rpc.name);
|
||||
backup_entry.1 += rpc.soft_limit;
|
||||
|
||||
match web3_rpcs
|
||||
.block(authorization, block_to_check.parent_hash(), Some(rpc))
|
||||
.await
|
||||
{
|
||||
Ok(parent_block) => block_to_check = parent_block,
|
||||
Err(err) => {
|
||||
warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Err(anyhow::anyhow!("failed finding consensus on all tiers").into());
|
||||
// we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above)
|
||||
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
|
||||
return Ok(Some(consensus));
|
||||
}
|
||||
|
||||
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
|
||||
if let Some(consensus) = backup_consensus {
|
||||
return Ok(Some(consensus));
|
||||
}
|
||||
|
||||
// count votes one last time
|
||||
Ok(self.count_votes(&backup_votes, web3_rpcs))
|
||||
}
|
||||
|
||||
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
|
||||
fn count_votes(
|
||||
&self,
|
||||
votes: &HashMap<Web3ProxyBlock, (HashSet<&str>, u32)>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
) -> Option<ConsensusWeb3Rpcs> {
|
||||
// sort the primary votes ascending by tier and descending by block num
|
||||
let mut votes: Vec<_> = votes
|
||||
.iter()
|
||||
.map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names))
|
||||
.collect();
|
||||
votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| {
|
||||
(
|
||||
Reverse(*block.number()),
|
||||
Reverse(*sum_soft_limit),
|
||||
Reverse(rpc_names.len()),
|
||||
)
|
||||
});
|
||||
|
||||
// return the first result that exceededs confgured minimums (if any)
|
||||
for (maybe_head_block, sum_soft_limit, rpc_names) in votes {
|
||||
if *sum_soft_limit < web3_rpcs.min_sum_soft_limit {
|
||||
continue;
|
||||
}
|
||||
// TODO: different mins for backup vs primary
|
||||
if rpc_names.len() < web3_rpcs.min_head_rpcs {
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("rpc_names: {:#?}", rpc_names);
|
||||
|
||||
// consensus likely found! load the rpcs to make sure they all have active connections
|
||||
let consensus_rpcs: Vec<_> = rpc_names
|
||||
.into_iter()
|
||||
.filter_map(|x| web3_rpcs.get(x))
|
||||
.collect();
|
||||
|
||||
if consensus_rpcs.len() < web3_rpcs.min_head_rpcs {
|
||||
continue;
|
||||
}
|
||||
// consensus found!
|
||||
|
||||
let tier = consensus_rpcs
|
||||
.iter()
|
||||
.map(|x| x.tier)
|
||||
.max()
|
||||
.expect("there should always be a max");
|
||||
|
||||
let backups_needed = consensus_rpcs.iter().any(|x| x.backup);
|
||||
|
||||
let consensus = ConsensusWeb3Rpcs {
|
||||
tier,
|
||||
head_block: maybe_head_block.clone(),
|
||||
best_rpcs: consensus_rpcs,
|
||||
backups_needed,
|
||||
};
|
||||
|
||||
return Some(consensus);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn worst_tier(&self) -> Option<u64> {
|
||||
self.rpc_heads.iter().map(|(x, _)| x.tier).max()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,12 +29,13 @@ use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use serde_json::value::RawValue;
|
||||
use std::cmp::min_by_key;
|
||||
use std::cmp::{min_by_key, Reverse};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::{cmp, fmt};
|
||||
use thread_fast_rng::rand::seq::SliceRandom;
|
||||
use tokio;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio::task;
|
||||
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
||||
@ -397,7 +398,7 @@ impl Web3Rpcs {
|
||||
let clone = self.clone();
|
||||
let authorization = authorization.clone();
|
||||
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
|
||||
let handle = task::spawn(async move {
|
||||
let handle = tokio::task::spawn(async move {
|
||||
// TODO: set up this future the same as the block funnel
|
||||
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
|
||||
let f = clone.clone().process_incoming_tx_id(
|
||||
@ -420,7 +421,7 @@ impl Web3Rpcs {
|
||||
let connections = Arc::clone(&self);
|
||||
let pending_tx_sender = pending_tx_sender.clone();
|
||||
|
||||
let handle = task::Builder::default()
|
||||
let handle = tokio::task::Builder::default()
|
||||
.name("process_incoming_blocks")
|
||||
.spawn(async move {
|
||||
connections
|
||||
@ -434,12 +435,14 @@ impl Web3Rpcs {
|
||||
if futures.is_empty() {
|
||||
// no transaction or block subscriptions.
|
||||
|
||||
let handle = task::Builder::default().name("noop").spawn(async move {
|
||||
loop {
|
||||
sleep(Duration::from_secs(600)).await;
|
||||
// TODO: "every interval, check that the provider is still connected"
|
||||
}
|
||||
})?;
|
||||
let handle = tokio::task::Builder::default()
|
||||
.name("noop")
|
||||
.spawn(async move {
|
||||
loop {
|
||||
sleep(Duration::from_secs(600)).await;
|
||||
// TODO: "every interval, check that the provider is still connected"
|
||||
}
|
||||
})?;
|
||||
|
||||
futures.push(flatten_handle(handle));
|
||||
}
|
||||
@ -557,7 +560,7 @@ impl Web3Rpcs {
|
||||
// TODO: double check the logic on this. especially if only min is set
|
||||
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
|
||||
(None, None) => {
|
||||
// no required block given. treat this like the requested the consensus head block
|
||||
// no required block given. treat this like they requested the consensus head block
|
||||
cmp::Ordering::Equal
|
||||
}
|
||||
(None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num),
|
||||
@ -661,7 +664,7 @@ impl Web3Rpcs {
|
||||
// they are all at the same block and it is already sized to what we need
|
||||
let key = (0, None);
|
||||
|
||||
for x in synced_connections.rpcs.iter() {
|
||||
for x in synced_connections.best_rpcs.iter() {
|
||||
if skip.contains(x) {
|
||||
trace!("skipping: {}", x);
|
||||
continue;
|
||||
@ -789,7 +792,7 @@ impl Web3Rpcs {
|
||||
let synced_rpcs = self.watch_consensus_rpcs_sender.borrow();
|
||||
|
||||
if let Some(synced_rpcs) = synced_rpcs.as_ref() {
|
||||
synced_rpcs.rpcs.clone()
|
||||
synced_rpcs.best_rpcs.clone()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
@ -932,7 +935,7 @@ impl Web3Rpcs {
|
||||
request.id.clone(),
|
||||
) {
|
||||
Ok(response) => {
|
||||
if let Some(error) = &response.error.as_ref() {
|
||||
if let Some(error) = response.error.as_ref() {
|
||||
// trace!(?response, "rpc error");
|
||||
|
||||
if let Some(request_metadata) = request_metadata {
|
||||
@ -1007,8 +1010,10 @@ impl Web3Rpcs {
|
||||
|
||||
// TODO: emit a stat. if a server is getting skipped a lot, something is not right
|
||||
|
||||
// TODO: if we get a TrySendError, reconnect. wait why do we see a trysenderror on a dual provider? shouldn't it be using reqwest
|
||||
|
||||
debug!(
|
||||
"Backend server error on {}! Retrying on another. err={:?}",
|
||||
"Backend server error on {}! Retrying on another. err={:#?}",
|
||||
rpc, err
|
||||
);
|
||||
|
||||
@ -1064,20 +1069,28 @@ impl Web3Rpcs {
|
||||
let num_conns = self.by_name.read().len();
|
||||
let num_skipped = skip_rpcs.len();
|
||||
|
||||
let consensus = watch_consensus_connections.borrow();
|
||||
|
||||
let head_block_num = consensus.as_ref().map(|x| x.head_block.number());
|
||||
|
||||
if num_skipped == 0 {
|
||||
error!(
|
||||
"No servers synced ({:?}-{:?}) ({} known). None skipped",
|
||||
min_block_needed, max_block_needed, num_conns
|
||||
"No servers synced ({:?}-{:?}, {:?}) ({} known). None skipped",
|
||||
min_block_needed, max_block_needed, head_block_num, num_conns
|
||||
);
|
||||
debug!("{}", serde_json::to_string(&request).unwrap());
|
||||
|
||||
// TODO: remove this, or move to trace level
|
||||
// debug!("{}", serde_json::to_string(&request).unwrap());
|
||||
} else {
|
||||
// TODO: warn? debug? trace?
|
||||
warn!(
|
||||
"Requested data was not available on {}/{} servers",
|
||||
num_skipped, num_conns
|
||||
// TODO: error? warn? debug? trace?
|
||||
error!(
|
||||
"Requested data is not available ({:?}-{:?}, {:?}) ({} skipped, {} known)",
|
||||
min_block_needed, max_block_needed, head_block_num, num_skipped, num_conns
|
||||
);
|
||||
}
|
||||
|
||||
drop(consensus);
|
||||
|
||||
// TODO: what error code?
|
||||
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
|
||||
Ok(JsonRpcForwardedResponse::from_str(
|
||||
@ -1271,9 +1284,8 @@ impl Serialize for Web3Rpcs {
|
||||
/// TODO: should this be moved into a `impl Web3Rpc`?
|
||||
/// TODO: i think we still have sorts scattered around the code that should use this
|
||||
/// TODO: take AsRef or something like that? We don't need an Arc here
|
||||
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, bool, OrderedFloat<f64>) {
|
||||
let reversed_head_block = U64::MAX
|
||||
- x.head_block
|
||||
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) {
|
||||
let head_block = x.head_block
|
||||
.read()
|
||||
.as_ref()
|
||||
.map(|x| *x.number())
|
||||
@ -1293,7 +1305,7 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, bool, OrderedFloat<f
|
||||
|
||||
let backup = x.backup;
|
||||
|
||||
(reversed_head_block, tier, backup, peak_ewma)
|
||||
(Reverse(head_block), tier, backup, peak_ewma)
|
||||
}
|
||||
|
||||
mod tests {
|
||||
|
@ -10,8 +10,9 @@ use crate::rpcs::request::RequestErrorHandler;
|
||||
use anyhow::{anyhow, Context};
|
||||
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||
use ethers::types::{Address, Transaction, U256};
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use futures::future::try_join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use log::{debug, error, info, trace, warn, Level};
|
||||
use migration::sea_orm::DatabaseConnection;
|
||||
use ordered_float::OrderedFloat;
|
||||
@ -723,8 +724,14 @@ impl Web3Rpc {
|
||||
} else {
|
||||
RequestErrorHandler::ErrorLevel
|
||||
};
|
||||
|
||||
let mut delay_start = false;
|
||||
|
||||
// this does loop. just only when reconnect is enabled
|
||||
#[allow(clippy::never_loop)]
|
||||
loop {
|
||||
debug!("subscription loop started");
|
||||
|
||||
let mut futures = vec![];
|
||||
|
||||
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
|
||||
@ -741,7 +748,7 @@ impl Web3Rpc {
|
||||
block_sender.as_ref(),
|
||||
chain_id,
|
||||
authorization.db_conn.as_ref(),
|
||||
false,
|
||||
delay_start,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -758,6 +765,7 @@ impl Web3Rpc {
|
||||
|
||||
// health check loop
|
||||
loop {
|
||||
// TODO: do we need this to be abortable?
|
||||
if rpc.should_disconnect() {
|
||||
break;
|
||||
}
|
||||
@ -864,6 +872,7 @@ impl Web3Rpc {
|
||||
}
|
||||
|
||||
if let Some(block_sender) = &block_sender {
|
||||
// TODO: do we need this to be abortable?
|
||||
let f = self.clone().subscribe_new_heads(
|
||||
authorization.clone(),
|
||||
http_interval_receiver,
|
||||
@ -875,6 +884,7 @@ impl Web3Rpc {
|
||||
}
|
||||
|
||||
if let Some(tx_id_sender) = &tx_id_sender {
|
||||
// TODO: do we need this to be abortable?
|
||||
let f = self
|
||||
.clone()
|
||||
.subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone());
|
||||
@ -884,27 +894,36 @@ impl Web3Rpc {
|
||||
|
||||
match try_join_all(futures).await {
|
||||
Ok(_) => {
|
||||
// futures all exited without error. break instead of restarting subscriptions
|
||||
// future exited without error
|
||||
// TODO: think about this more. we never set it to false. this can't be right
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
if self.reconnect.load(atomic::Ordering::Acquire) {
|
||||
warn!("{} connection ended. err={:?}", self, err);
|
||||
let disconnect_sender = self.disconnect_watch.as_ref().unwrap();
|
||||
|
||||
self.clone()
|
||||
.retrying_connect(
|
||||
block_sender.as_ref(),
|
||||
chain_id,
|
||||
authorization.db_conn.as_ref(),
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
} else if *disconnect_receiver.borrow() {
|
||||
if self.reconnect.load(atomic::Ordering::Acquire) {
|
||||
warn!("{} connection ended. reconnecting. err={:?}", self, err);
|
||||
|
||||
// TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures.
|
||||
disconnect_sender.send_replace(true);
|
||||
disconnect_sender.send_replace(false);
|
||||
|
||||
// we call retrying_connect here with initial_delay=true. above, initial_delay=false
|
||||
delay_start = true;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// reconnect is not enabled.
|
||||
if *disconnect_receiver.borrow() {
|
||||
info!("{} is disconnecting", self);
|
||||
break;
|
||||
} else {
|
||||
error!("{} subscription exited. err={:?}", self, err);
|
||||
return Err(err);
|
||||
|
||||
disconnect_sender.send_replace(true);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1095,7 +1114,11 @@ impl Web3Rpc {
|
||||
self.send_head_block_result(Ok(None), &block_sender, block_map)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
if self.should_disconnect() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("new_heads subscription exited. reconnect needed"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Turn on the firehose of pending transactions
|
||||
@ -1149,7 +1172,11 @@ impl Web3Rpc {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
if self.should_disconnect() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("pending_transactions subscription exited. reconnect needed"))
|
||||
}
|
||||
}
|
||||
|
||||
/// be careful with this; it might wait forever!
|
||||
|
@ -170,8 +170,9 @@ impl OpenRequestHandle {
|
||||
};
|
||||
|
||||
let mut logged = false;
|
||||
while provider.is_none() {
|
||||
while provider.is_none() || provider.as_ref().map(|x| !x.ready()).unwrap() {
|
||||
// trace!("waiting on provider: locking...");
|
||||
// TODO: i dont like this. subscribing to a channel could be better
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
|
||||
if !logged {
|
||||
@ -197,7 +198,7 @@ impl OpenRequestHandle {
|
||||
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
||||
let response = match provider.as_ref() {
|
||||
#[cfg(test)]
|
||||
Web3Provider::Mock => unimplemented!(),
|
||||
Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())),
|
||||
Web3Provider::Ws(p) => p.request(method, params).await,
|
||||
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
|
||||
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
|
||||
|
@ -35,11 +35,12 @@ pub enum StatType {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RpcQueryStats {
|
||||
pub authorization: Arc<Authorization>,
|
||||
pub method: String,
|
||||
pub method: Option<String>,
|
||||
pub archive_request: bool,
|
||||
pub error_response: bool,
|
||||
pub request_bytes: u64,
|
||||
/// if backend_requests is 0, there was a cache_hit
|
||||
// pub frontend_request: u64,
|
||||
pub backend_requests: u64,
|
||||
pub response_bytes: u64,
|
||||
pub response_millis: u64,
|
||||
@ -96,7 +97,7 @@ impl RpcQueryStats {
|
||||
TrackingLevel::Detailed => {
|
||||
// detailed tracking keeps track of the method and origin
|
||||
// depending on the request, the origin might still be None
|
||||
let method = Some(self.method.clone());
|
||||
let method = self.method.clone();
|
||||
let origin = self.authorization.origin.clone();
|
||||
|
||||
(method, origin)
|
||||
@ -116,7 +117,7 @@ impl RpcQueryStats {
|
||||
/// all queries are aggregated
|
||||
/// TODO: should we store "anon" or "registered" as a key just to be able to split graphs?
|
||||
fn global_timeseries_key(&self) -> RpcQueryKey {
|
||||
let method = Some(self.method.clone());
|
||||
let method = self.method.clone();
|
||||
// we don't store origin in the timeseries db. its only used for optional accounting
|
||||
let origin = None;
|
||||
// everyone gets grouped together
|
||||
@ -140,7 +141,7 @@ impl RpcQueryStats {
|
||||
TrackingLevel::None => {
|
||||
// this RPC key requested no tracking. this is the default.
|
||||
// we still want graphs though, so we just use None as the rpc_secret_key_id
|
||||
(Some(self.method.clone()), None)
|
||||
(self.method.clone(), None)
|
||||
}
|
||||
TrackingLevel::Aggregated => {
|
||||
// this RPC key requested tracking aggregated across all methods
|
||||
@ -149,7 +150,7 @@ impl RpcQueryStats {
|
||||
TrackingLevel::Detailed => {
|
||||
// detailed tracking keeps track of the method
|
||||
(
|
||||
Some(self.method.clone()),
|
||||
self.method.clone(),
|
||||
self.authorization.checks.rpc_secret_key_id,
|
||||
)
|
||||
}
|
||||
@ -361,10 +362,10 @@ impl BufferedRpcQueryStats {
|
||||
|
||||
impl RpcQueryStats {
|
||||
pub fn new(
|
||||
method: String,
|
||||
method: Option<String>,
|
||||
authorization: Arc<Authorization>,
|
||||
metadata: Arc<RequestMetadata>,
|
||||
response_bytes: usize
|
||||
response_bytes: usize,
|
||||
) -> Self {
|
||||
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
|
||||
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
|
||||
@ -391,6 +392,17 @@ impl RpcQueryStats {
|
||||
}
|
||||
}
|
||||
|
||||
/// Only used for migration from stats_v1 to stats_v2/v3
|
||||
pub fn modify_struct(
|
||||
&mut self,
|
||||
response_millis: u64,
|
||||
response_timestamp: i64,
|
||||
backend_requests: u64,
|
||||
) {
|
||||
self.response_millis = response_millis;
|
||||
self.response_timestamp = response_timestamp;
|
||||
self.backend_requests = backend_requests;
|
||||
}
|
||||
}
|
||||
|
||||
impl StatBuffer {
|
||||
@ -450,6 +462,7 @@ impl StatBuffer {
|
||||
loop {
|
||||
tokio::select! {
|
||||
stat = stat_receiver.recv_async() => {
|
||||
// info!("Received stat");
|
||||
// save the stat to a buffer
|
||||
match stat {
|
||||
Ok(AppStat::RpcQuery(stat)) => {
|
||||
@ -476,6 +489,7 @@ impl StatBuffer {
|
||||
}
|
||||
}
|
||||
_ = db_save_interval.tick() => {
|
||||
// info!("DB save internal tick");
|
||||
let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats");
|
||||
|
||||
// TODO: batch saves
|
||||
@ -487,6 +501,7 @@ impl StatBuffer {
|
||||
}
|
||||
}
|
||||
_ = tsdb_save_interval.tick() => {
|
||||
// info!("TSDB save internal tick");
|
||||
// TODO: batch saves
|
||||
// TODO: better bucket names
|
||||
let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats");
|
||||
@ -506,6 +521,7 @@ impl StatBuffer {
|
||||
}
|
||||
}
|
||||
x = shutdown_receiver.recv() => {
|
||||
info!("shutdown signal ---");
|
||||
match x {
|
||||
Ok(_) => {
|
||||
info!("stat_loop shutting down");
|
||||
@ -544,13 +560,7 @@ impl StatBuffer {
|
||||
|
||||
for (key, stat) in global_timeseries_buffer.drain() {
|
||||
if let Err(err) = stat
|
||||
.save_timeseries(
|
||||
&bucket,
|
||||
"global_proxy",
|
||||
self.chain_id,
|
||||
influxdb_client,
|
||||
key,
|
||||
)
|
||||
.save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
@ -567,13 +577,7 @@ impl StatBuffer {
|
||||
|
||||
for (key, stat) in opt_in_timeseries_buffer.drain() {
|
||||
if let Err(err) = stat
|
||||
.save_timeseries(
|
||||
&bucket,
|
||||
"opt_in_proxy",
|
||||
self.chain_id,
|
||||
influxdb_client,
|
||||
key,
|
||||
)
|
||||
.save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
|
Loading…
Reference in New Issue
Block a user