diff --git a/Cargo.lock b/Cargo.lock index c8c817a7..def87e87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2275,7 +2275,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0176e0459c2e4a1fe232f984bca6890e681076abb9934f6cea7c326f3fc47818" dependencies = [ "libc", - "windows-targets 0.48.0", + "windows-targets", ] [[package]] @@ -2440,6 +2440,7 @@ checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" dependencies = [ "base64 0.13.1", "byteorder", + "crossbeam-channel", "flate2", "nom", "num-traits", @@ -3057,9 +3058,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" dependencies = [ "autocfg", "scopeguard", @@ -3217,9 +3218,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36506f2f935238463605f3bb13b362f1949daafc3b347d05d60ae08836db2bd2" +checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" dependencies = [ "async-io", "async-lock", @@ -3227,7 +3228,6 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "futures-util", - "num_cpus", "once_cell", "parking_lot 0.12.1", "quanta", @@ -3694,7 +3694,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", + "parking_lot_core 0.9.8", ] [[package]] @@ -3713,18 +3713,18 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.7" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "backtrace", "cfg-if", "libc", "petgraph", - "redox_syscall 0.2.16", + "redox_syscall 0.3.5", "smallvec", "thread-id", - "windows-sys 0.45.0", + "windows-targets", ] [[package]] @@ -4107,9 +4107,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" dependencies = [ "unicode-ident", ] @@ -4304,9 +4304,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.31.0" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88383df3a85a38adfa2aa447d3ab6eb9cedcb49613adcf18e7e7ebb3b62e9b03" +checksum = "f8733bc5dc0b192d1a4b28073f9bff1326ad9e4fecd4d9b025d6fc358d1c3e79" dependencies = [ "futures-channel", "futures-util", @@ -4322,9 +4322,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.4.0+1.9.2" +version = "4.5.0+1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ac9d87c3aba1748e3112318459f2ac8bff80bfff7359e338e0463549590249" +checksum = "1bb0676c2112342ac7165decdedbc4e7086c0af384479ccce534546b10687a5d" dependencies = [ "cmake", "libc", @@ -5184,18 +5184,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.163" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.163" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", @@ -5776,15 +5776,16 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.5.0" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" +checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" dependencies = [ + "autocfg", "cfg-if", "fastrand", "redox_syscall 0.3.5", "rustix", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -6719,6 +6720,7 @@ dependencies = [ "axum", "axum-client-ip", "axum-macros", + "base64 0.21.2", "chrono", "console-subscriber", "counter", @@ -6737,6 +6739,7 @@ dependencies = [ "glob", "handlebars", "hashbrown 0.14.0", + "hdrhistogram", "hostname", "http", "hyper", @@ -6852,7 +6855,7 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-targets 0.48.0", + "windows-targets", ] [[package]] @@ -6870,37 +6873,13 @@ dependencies = [ "windows_x86_64_msvc 0.42.2", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets 0.48.0", -] - -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", + "windows-targets", ] [[package]] diff --git a/config/development_polygon.toml b/config/development_polygon.toml index f6eb5743..65a34f21 100644 --- a/config/development_polygon.toml +++ b/config/development_polygon.toml @@ -8,7 +8,7 @@ db_max_connections = 20 # production runs inside docker and so uses "mysql://root:web3_proxy@db:3306/web3_proxy" for db_url db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" -deposit_factory_contract = "0x4e3bc2054788de923a04936c6addb99a05b0ea36" +deposit_factory_contract = "0x4e3BC2054788De923A04936C6ADdB99A05B0Ea36" deposit_topic = "0x45fdc265dc29885b9a485766b03e70978440d38c7c328ee0a14fa40c76c6af54" # a timeseries database is optional. it is used for making pretty graphs diff --git a/config/example.toml b/config/example.toml index ad6ac303..962dc9c4 100644 --- a/config/example.toml +++ b/config/example.toml @@ -11,8 +11,7 @@ db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" # read-only replica useful when running the proxy in multiple regions db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" -deposit_factory_contract = "0x4e3bc2054788de923a04936c6addb99a05b0ea36" -deposit_topic = "0x45fdc265dc29885b9a485766b03e70978440d38c7c328ee0a14fa40c76c6af54" +deposit_factory_contract = "0x4e3BC2054788De923A04936C6ADdB99A05B0Ea36" kafka_urls = "127.0.0.1:19092" kafka_protocol = "plaintext" diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index 52b17b61..38f103b0 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -10,5 +10,5 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.71" hashbrown = "0.14.0" log = "0.4.18" -moka = { version = "0.11.1", features = ["future"] } +moka = { version = "0.11.2", features = ["future"] } tokio = "1.28.2" diff --git a/docs/histograms.txt b/docs/histograms.txt new file mode 100644 index 00000000..65f57edb --- /dev/null +++ b/docs/histograms.txt @@ -0,0 +1,5 @@ +[2023-06-09T18:45:50Z WARN web3_proxy::rpcs::consensus] TODO: find the troughs in the histogram: HISTFAAAADV4nC3GoQ0AIAxE0eNAkRAEwbELQxE2QGG7aEeoaL95fz0ZACq8HKbwb/U5bGXystMAZl8EMw== + +Paste the HIST data here: + +Save the "Decoded histogram data" to a file and upload it here: diff --git a/docs/http routes.txt b/docs/http routes.txt index e6ab9a25..05d96490 100644 --- a/docs/http routes.txt +++ b/docs/http routes.txt @@ -111,11 +111,12 @@ GET /user/balance If valid, displays data about the user's balance and payments as JSON. POST /user/balance/:txid - Not yet implemented. Rate limited by IP. + Rate limited by IP. Checks the ":txid" for a transaction that updates a user's balance. The backend will be watching for these transactions, so this should not be needed in the common case. However, log susbcriptions are not perfect and so it might sometimes be needed. + Any authorized user can call this endpoint for any other user's transaction. GET /user/keys Checks the "AUTHORIZATION" header for a valid bearer token. @@ -141,10 +142,6 @@ GET /subuser/rpc_keys GET /user/deposits Retrieves the user's deposit history. -GET /user/balance/:tx_hash - Accepts a tx_hash and updates the user's balance according to this transaction. - Any authorized user can call this endpoint for any other user's transaction. - GET /user/referral Fetches a user's referral link. diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 8ab86762..60a85ce8 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -11,7 +11,7 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.11.3" -serde = "1.0.163" +serde = "1.0.164" uuid = "1.3.3" ethers = "2.0.7" ulid = "1.0.0" diff --git a/latency/Cargo.toml b/latency/Cargo.toml index 0f3fbf98..a9448273 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" ewma = "0.1.1" flume = "0.10.14" log = "0.4.18" -serde = { version = "1.0.163", features = [] } +serde = { version = "1.0.164", features = [] } tokio = { version = "1.28.2", features = ["full"] } [dev-dependencies] diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 004f03e8..75af82c2 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2023-05-25" +channel = "nightly-2023-06-09" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 21058798..723bedf0 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -67,7 +67,7 @@ itertools = "0.10.5" listenfd = "1.0.1" log = "0.4.18" mimalloc = { version = "0.1.37", optional = true} -moka = { version = "0.11.1", features = ["future"] } +moka = { version = "0.11.2", features = ["future"] } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.18.0" } @@ -76,13 +76,13 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly", "serde"] } prettytable = "0.10.0" proctitle = "0.1.1" -rdkafka = { version = "0.31.0" } +rdkafka = { version = "0.32.2" } regex = "1.8.4" reqwest = { version = "0.11.18", default-features = false, features = ["deflate", "gzip", "json", "tokio-rustls"] } rmp-serde = "1.1.1" rust_decimal = { version = "1.29.1", features = ["maths"] } sentry = { version = "0.31.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } -serde = { version = "1.0.163", features = [] } +serde = { version = "1.0.164", features = [] } serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.2.2" siwe = "0.5.2" @@ -100,6 +100,8 @@ tracing-subscriber = "0.3" ulid = { version = "1.0.0", features = ["rand", "uuid", "serde"] } url = "2.4.0" uuid = { version = "1.3.3", default-features = false, features = ["fast-rng", "serde", "v4", "zerocopy"] } +hdrhistogram = "7.5.2" +base64 = "0.21.2" [dev-dependencies] tokio = { version = "1.28.2", features = ["full", "test-util"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 664c0c26..b8388567 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -49,7 +49,6 @@ use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRa use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; -use std::borrow::Cow; use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; @@ -1512,15 +1511,13 @@ impl Web3ProxyApp { JsonRpcResponseEnum::from(serde_json::Value::Bool(false)) } "eth_subscribe" => JsonRpcErrorData { - message: Cow::Borrowed( - "notifications not supported. eth_subscribe is only available over a websocket", - ), + message: "notifications not supported. eth_subscribe is only available over a websocket".into(), code: -32601, data: None, } .into(), "eth_unsubscribe" => JsonRpcErrorData { - message: Cow::Borrowed("notifications not supported. eth_unsubscribe is only available over a websocket"), + message: "notifications not supported. eth_unsubscribe is only available over a websocket".into(), code: -32601, data: None, }.into(), @@ -1547,7 +1544,7 @@ impl Web3ProxyApp { // TODO: what error code? // TODO: use Web3ProxyError::BadRequest JsonRpcErrorData { - message: Cow::Borrowed("Invalid request"), + message: "Invalid request".into(), code: -32600, data: None }.into() @@ -1575,7 +1572,7 @@ impl Web3ProxyApp { // TODO: this needs the correct error code in the response // TODO: Web3ProxyError::BadRequest instead? JsonRpcErrorData { - message: Cow::Borrowed("invalid request"), + message: "invalid request".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }.into() @@ -1583,7 +1580,7 @@ impl Web3ProxyApp { } } "test" => JsonRpcErrorData { - message: Cow::Borrowed("The method test does not exist/is not available."), + message: "The method test does not exist/is not available.".into(), code: -32601, data: None, }.into(), diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index a8ca67a0..f377bee0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -310,7 +310,6 @@ mod tests { Web3RpcConfig { http_url: Some(anvil.endpoint()), soft_limit: 100, - tier: 0, ..Default::default() }, ), @@ -319,7 +318,6 @@ mod tests { Web3RpcConfig { ws_url: Some(anvil.ws_endpoint()), soft_limit: 100, - tier: 0, ..Default::default() }, ), diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 7a0779a2..a6315fb1 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -256,9 +256,6 @@ pub struct Web3RpcConfig { /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs #[serde(default)] pub backup: bool, - /// All else equal, a server with a lower tier receives all requests - #[serde(default = "default_tier")] - pub tier: u64, /// Subscribe to the firehose of pending transactions /// Don't do this with free rpcs #[serde(default)] @@ -268,10 +265,6 @@ pub struct Web3RpcConfig { pub extra: HashMap, } -fn default_tier() -> u64 { - 0 -} - impl Web3RpcConfig { /// Create a Web3Rpc from config /// TODO: move this into Web3Rpc? (just need to make things pub(crate)) diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 7ac37da2..ecc0f66b 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -49,7 +49,7 @@ pub enum Web3ProxyError { BadRequest(Cow<'static, str>), #[error(ignore)] #[from(ignore)] - BadResponse(String), + BadResponse(Cow<'static, str>), BadRouting, Contract(ContractError), Database(DbErr), @@ -61,6 +61,7 @@ pub enum Web3ProxyError { EthersWsClient(ethers::prelude::WsClientError), FlumeRecv(flume::RecvError), GasEstimateNotU256, + HdrRecord(hdrhistogram::errors::RecordError), Headers(headers::Error), HeaderToString(ToStrError), Hyper(hyper::Error), @@ -133,7 +134,7 @@ pub enum Web3ProxyError { SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] - StatusCode(StatusCode, String, Option), + StatusCode(StatusCode, Cow<'static, str>, Option), /// TODO: what should be attached to the timout? #[display(fmt = "{:?}", _0)] #[error(ignore)] @@ -152,7 +153,7 @@ pub enum Web3ProxyError { WebsocketOnly, #[display(fmt = "{:?}, {}", _0, _1)] #[error(ignore)] - WithContext(Option>, String), + WithContext(Option>, Cow<'static, str>), } impl Web3ProxyError { @@ -164,7 +165,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -176,7 +177,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Borrowed("FORBIDDEN"), + message: "FORBIDDEN".into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -188,7 +189,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose all of our anyhow strings? - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -203,7 +204,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("bad request: {}", err)), + message: format!("bad request: {}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -215,7 +216,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(format!("bad response: {}", err)), + message: format!("bad response: {}", err).into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -226,7 +227,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("bad routing"), + message: "bad routing".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -237,7 +238,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("database error!"), + message: "database error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -248,7 +249,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(format!("contract error: {}", err)), + message: format!("contract error: {}", err).into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -259,7 +260,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("decimal error: {}", err)), + message: format!("decimal error: {}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -273,10 +274,11 @@ impl Web3ProxyError { ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "both the primary and eip191 verification failed: {:#?}; {:#?}", err_1, err_191 - )), + ) + .into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -287,7 +289,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("ether http client error"), + message: "ether http client error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -298,7 +300,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("ether provider error"), + message: "ether provider error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -309,7 +311,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("ether ws client error"), + message: "ether ws client error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -320,7 +322,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("flume recv error!"), + message: "flume recv error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -332,7 +334,18 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("gas estimate result is not an U256"), + message: "gas estimate result is not an U256".into(), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } + Self::HdrRecord(err) => { + warn!("HdrRecord {:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -343,7 +356,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("{}", err)), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -355,7 +368,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose these error strings? - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -367,7 +380,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("influxdb2 error!"), + message: "influxdb2 error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -378,10 +391,11 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "Invalid blocks bounds requested. min ({}) > max ({})", min, max - )), + ) + .into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -392,7 +406,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -403,7 +417,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("IP ({}) is not allowed!", ip)), + message: format!("IP ({}) is not allowed!", ip).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -414,7 +428,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("{}", err)), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -425,7 +439,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("invalid message eip given"), + message: "invalid message eip given".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -436,7 +450,7 @@ impl Web3ProxyError { ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Borrowed("invalid invite code"), + message: "invalid invite code".into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -448,7 +462,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose our io error strings? - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -459,7 +473,7 @@ impl Web3ProxyError { ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Borrowed("invalid referral code"), + message: "invalid referral code".into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -470,7 +484,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("invalid referer!"), + message: "invalid referer!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -481,7 +495,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("invalid signature length"), + message: "invalid signature length".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -492,7 +506,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Borrowed("invalid user agent!"), + message: "invalid user agent!".into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -503,7 +517,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("UserKey was not a ULID or UUID"), + message: "UserKey was not a ULID or UUID".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -514,7 +528,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("UserTier is not valid!"), + message: "UserTier is not valid!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -533,7 +547,7 @@ impl Web3ProxyError { code, JsonRpcErrorData { // TODO: different messages of cancelled or not? - message: Cow::Borrowed("Unable to complete request"), + message: "Unable to complete request".into(), code: code.as_u16().into(), data: None, }, @@ -548,7 +562,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(format!("msgpack encode error: {}", err)), + message: format!("msgpack encode error: {}", err).into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -559,7 +573,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("Blocks here must have a number or hash"), + message: "Blocks here must have a number or hash".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -570,7 +584,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no blocks known"), + message: "no blocks known".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -581,7 +595,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no consensus head block"), + message: "no consensus head block".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -592,7 +606,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("unable to retry for request handle"), + message: "unable to retry for request handle".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -603,7 +617,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no servers synced"), + message: "no servers synced".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -617,10 +631,11 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "not enough rpcs connected {}/{}", num_known, min_head_rpcs - )), + ) + .into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -631,10 +646,11 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "not enough soft limit available {}/{}", available, needed - )), + ) + .into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -646,7 +662,7 @@ impl Web3ProxyError { ( StatusCode::NOT_FOUND, JsonRpcErrorData { - message: Cow::Borrowed("not found!"), + message: "not found!".into(), code: StatusCode::NOT_FOUND.as_u16().into(), data: None, }, @@ -657,7 +673,7 @@ impl Web3ProxyError { ( StatusCode::NOT_IMPLEMENTED, JsonRpcErrorData { - message: Cow::Borrowed("work in progress"), + message: "work in progress".into(), code: StatusCode::NOT_IMPLEMENTED.as_u16().into(), data: None, }, @@ -668,7 +684,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("Origin required"), + message: "Origin required".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -679,7 +695,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("Origin ({}) is not allowed!", origin)), + message: format!("Origin ({}) is not allowed!", origin).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -690,7 +706,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("parse bytes error!"), + message: "parse bytes error!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -701,7 +717,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("parse message error!"), + message: "parse message error!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -712,7 +728,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("unable to parse address"), + message: "unable to parse address".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -723,7 +739,7 @@ impl Web3ProxyError { ( StatusCode::PAYMENT_REQUIRED, JsonRpcErrorData { - message: Cow::Borrowed("Payment is required and user is not premium"), + message: "Payment is required and user is not premium".into(), code: StatusCode::PAYMENT_REQUIRED.as_u16().into(), data: None, }, @@ -755,7 +771,7 @@ impl Web3ProxyError { ( StatusCode::TOO_MANY_REQUESTS, JsonRpcErrorData { - message: Cow::Owned(msg), + message: msg.into(), code: StatusCode::TOO_MANY_REQUESTS.as_u16().into(), data: None, }, @@ -766,7 +782,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("redis error!"), + message: "redis error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -777,7 +793,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("Referer required"), + message: "Referer required".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -788,7 +804,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("Referer ({:?}) is not allowed", referer)), + message: format!("Referer ({:?}) is not allowed", referer).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -800,7 +816,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose all of our anyhow strings? - message: Cow::Borrowed("semaphore acquire error"), + message: "semaphore acquire error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -811,7 +827,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("error stat_sender sending response_stat"), + message: "error stat_sender sending response_stat".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -822,7 +838,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("de/serialization error! {}", err)), + message: format!("de/serialization error! {}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -840,7 +856,7 @@ impl Web3ProxyError { ( *status_code, JsonRpcErrorData { - message: err_msg.to_owned().into(), + message: err_msg.clone(), code: code.into(), data: None, }, @@ -892,7 +908,7 @@ impl Web3ProxyError { Self::UnknownKey => ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Borrowed("unknown api key!"), + message: "unknown api key!".into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -902,7 +918,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("User agent required"), + message: "User agent required".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -913,7 +929,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("User agent ({}) is not allowed!", ua)), + message: format!("User agent ({}) is not allowed!", ua).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -925,7 +941,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("user ids should always be non-zero"), + message: "user ids should always be non-zero".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -936,7 +952,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("verification error!"), + message: "verification error!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -947,7 +963,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("watch recv error!"), + message: "watch recv error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -958,7 +974,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("watch send error!"), + message: "watch send error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -985,7 +1001,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: msg.to_owned().into(), + message: msg.clone(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -1026,11 +1042,11 @@ impl IntoResponse for Web3ProxyError { } pub trait Web3ProxyErrorContext { - fn web3_context>(self, msg: S) -> Result; + fn web3_context>>(self, msg: S) -> Result; } impl Web3ProxyErrorContext for Option { - fn web3_context>(self, msg: S) -> Result { + fn web3_context>>(self, msg: S) -> Result { self.ok_or(Web3ProxyError::WithContext(None, msg.into())) } } @@ -1039,7 +1055,7 @@ impl Web3ProxyErrorContext for Result where E: Into, { - fn web3_context>(self, msg: S) -> Result { + fn web3_context>>(self, msg: S) -> Result { self.map_err(|err| Web3ProxyError::WithContext(Some(Box::new(err.into())), msg.into())) } } diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index b2012448..d64241e1 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -354,23 +354,33 @@ pub async fn admin_login_post( // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { - let their_msg_bytes = - Bytes::from_str(&payload.msg).web3_context("parsing payload message")?; + let their_msg_bytes = Bytes::from_str(&payload.msg).map_err(|err| { + Web3ProxyError::BadRequest( + format!("error parsing payload message as Bytes: {}", err).into(), + ) + })?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() - .web3_context("parsing hex string message")? + .map_err(|err| { + Web3ProxyError::BadRequest( + format!("error parsing bytes as siwe message: {}", err).into(), + ) + })? } else { - payload - .msg - .parse::() - .web3_context("parsing string message")? + payload.msg.parse::().map_err(|err| { + Web3ProxyError::BadRequest( + format!("error parsing string as siwe message: {}", err).into(), + ) + })? }; // the only part of the message we will trust is their nonce // TODO: this is fragile. have a helper function/struct for redis keys - let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; + let login_nonce = UserBearerToken::from_str(&their_msg.nonce).map_err(|err| { + Web3ProxyError::BadRequest(format!("error parsing nonce: {}", err).into()) + })?; // fetch the message we gave them from our database let db_replica = app diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 9528a189..d5c3e9b5 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -172,7 +172,7 @@ pub async fn serve( .route("/user/deposits", get(users::payment::user_deposits_get)) .route( "/user/balance/:tx_hash", - get(users::payment::user_balance_post), + post(users::payment::user_balance_post), ) .route("/user/keys", get(users::rpc_keys::rpc_keys_get)) .route("/user/keys", post(users::rpc_keys::rpc_keys_management)) diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index e597a9ab..cc8a3225 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -264,7 +264,7 @@ async fn _websocket_handler_with_key( ) { (None, None, _) => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, - "this page is for rpcs".to_string(), + "this page is for rpcs".into(), None, )), (Some(redirect_public_url), _, None) => { @@ -277,7 +277,7 @@ async fn _websocket_handler_with_key( // i don't think this is possible Err(Web3ProxyError::StatusCode( StatusCode::UNAUTHORIZED, - "AUTHORIZATION header required".to_string(), + "AUTHORIZATION header required".into(), None, )) } else { @@ -295,7 +295,7 @@ async fn _websocket_handler_with_key( // any other combinations get a simple error _ => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, - "this page is for rpcs".to_string(), + "this page is for rpcs".into(), None, )), } @@ -347,84 +347,97 @@ async fn handle_socket_payload( let response_id = json_request.id.clone(); // TODO: move this to a seperate function so we can use the try operator - let response: Web3ProxyResult = - match &json_request.method[..] { - "eth_subscribe" => { - // TODO: how can we subscribe with proxy_mode? - match app - .eth_subscribe( - authorization.clone(), - json_request, - subscription_count, - response_sender.clone(), - ) - .await - { - Ok((handle, response)) => { - { - let mut x = subscriptions.write().await; - - let result: &serde_json::value::RawValue = response - .result - .as_ref() - .context("there should be a result here")?; - - // TODO: there must be a better way to turn a RawValue - let k: U64 = serde_json::from_str(result.get()) - .context("subscription ids must be U64s")?; - - x.insert(k, handle); - }; - - Ok(response.into()) - } - Err(err) => Err(err), - } - } - "eth_unsubscribe" => { - let request_metadata = - RequestMetadata::new(&app, authorization.clone(), &json_request, None) - .await; - - #[derive(serde::Deserialize)] - struct EthUnsubscribeParams([U64; 1]); - - match serde_json::from_value(json_request.params) { - Ok::(params) => { - let subscription_id = ¶ms.0[0]; - - // TODO: is this the right response? - let partial_response = { - let mut x = subscriptions.write().await; - match x.remove(subscription_id) { - None => false, - Some(handle) => { - handle.abort(); - true - } - } - }; - - // TODO: don't create the response here. use a JsonRpcResponseData instead - let response = JsonRpcForwardedResponse::from_value( - json!(partial_response), - response_id.clone(), - ); - - request_metadata.add_response(&response); - - Ok(response.into()) - } - Err(err) => Err(Web3ProxyError::BadRequest( - f!("incorrect params given for eth_unsubscribe. {err:?}").into(), - )), - } - } - _ => app - .proxy_web3_rpc(authorization.clone(), json_request.into()) + let response: Web3ProxyResult = match &json_request.method + [..] + { + "eth_subscribe" => { + // TODO: how can we subscribe with proxy_mode? + match app + .eth_subscribe( + authorization.clone(), + json_request, + subscription_count, + response_sender.clone(), + ) .await - .map(|(_, response, _)| response), - }; + { + Ok((handle, response)) => { + { + let mut x = subscriptions.write().await; + + let result: &serde_json::value::RawValue = response + .result + .as_ref() + .context("there should be a result here")?; + + // TODO: there must be a better way to turn a RawValue + let k: U64 = serde_json::from_str(result.get()) + .context("subscription ids must be U64s")?; + + x.insert(k, handle); + }; + + Ok(response.into()) + } + Err(err) => Err(err), + } + } + "eth_unsubscribe" => { + let request_metadata = + RequestMetadata::new(&app, authorization.clone(), &json_request, None) + .await; + + let subscription_id: U64 = if json_request.params.is_array() { + if let Some(params) = json_request.params.get(0) { + serde_json::from_value(params.clone()).map_err(|err| { + Web3ProxyError::BadRequest( + format!("invalid params for eth_unsubscribe: {}", err).into(), + ) + })? + } else { + return Err(Web3ProxyError::BadRequest( + f!("no params for eth_unsubscribe").into(), + )); + } + } else if json_request.params.is_string() { + serde_json::from_value(json_request.params).map_err(|err| { + Web3ProxyError::BadRequest( + format!("invalid params for eth_unsubscribe: {}", err).into(), + ) + })? + } else { + return Err(Web3ProxyError::BadRequest( + "unexpected params given for eth_unsubscribe".into(), + )); + }; + + // TODO: is this the right response? + let partial_response = { + let mut x = subscriptions.write().await; + match x.remove(&subscription_id) { + None => false, + Some(handle) => { + handle.abort(); + true + } + } + }; + + // TODO: don't create the response here. use a JsonRpcResponseData instead + let response = JsonRpcForwardedResponse::from_value( + json!(partial_response), + response_id.clone(), + ); + + request_metadata.add_response(&response); + + Ok(response.into()) + } + _ => app + .proxy_web3_rpc(authorization.clone(), json_request.into()) + .await + .map(|(_, response, _)| response), + }; (response_id, response) } diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index f189301a..7d553061 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -145,6 +145,7 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "private_rpcs": app.private_rpcs, "rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache), "user_balance_cache": MokaCacheSerializer(&app.user_balance_cache), + "payment_factory_address": app.config.deposit_factory_contract, "version": APP_USER_AGENT, }); diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 44cfde21..c283d979 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -18,11 +18,11 @@ use entities::{balance, login, pending_login, referee, referrer, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; -use log::{debug, warn, trace}; +use log::{debug, trace, warn}; use migration::sea_orm::prelude::{Decimal, Uuid}; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, - TransactionTrait, + self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, + QueryFilter, TransactionTrait, }; use serde_json::json; use siwe::{Message, VerificationOpts}; @@ -143,6 +143,54 @@ pub async fn user_login_get( Ok(message.into_response()) } +pub async fn register_new_user( + db_conn: &DatabaseConnection, + address: Address, +) -> anyhow::Result<(user::Model, rpc_key::Model, balance::Model)> { + // all or nothing + let txn = db_conn.begin().await?; + + // the only thing we need from them is an address + // everything else is optional + // TODO: different invite codes should allow different levels + // TODO: maybe decrement a count on the invite code? + // TODO: There will be two different transactions. The first one inserts the user, the second one marks the user as being referred + let new_user = user::ActiveModel { + address: sea_orm::Set(address.to_fixed_bytes().into()), + ..Default::default() + }; + + let new_user = new_user.insert(&txn).await?; + + // create the user's first api key + let rpc_secret_key = RpcSecretKey::new(); + + let user_rpc_key = rpc_key::ActiveModel { + user_id: sea_orm::Set(new_user.id), + secret_key: sea_orm::Set(rpc_secret_key.into()), + description: sea_orm::Set(None), + ..Default::default() + }; + + let user_rpc_key = user_rpc_key + .insert(&txn) + .await + .web3_context("Failed saving new user key")?; + + // create an empty balance entry + let user_balance = balance::ActiveModel { + user_id: sea_orm::Set(new_user.id), + ..Default::default() + }; + + let user_balance = user_balance.insert(&txn).await?; + + // save the user and key and balance to the database + txn.commit().await?; + + Ok((new_user, user_rpc_key, user_balance)) +} + /// `POST /user/login` - Register or login by posting a signed "siwe" message. /// It is recommended to save the returned bearer token in a cookie. /// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile. @@ -264,50 +312,8 @@ pub async fn user_login_post( } } - let txn = db_conn.begin().await?; - - // First add a user - - // the only thing we need from them is an address - // everything else is optional - // TODO: different invite codes should allow different levels - // TODO: maybe decrement a count on the invite code? - // TODO: There will be two different transactions. The first one inserts the user, the second one marks the user as being referred - let caller = user::ActiveModel { - address: sea_orm::Set(our_msg.address.into()), - ..Default::default() - }; - - let caller = caller.insert(&txn).await?; - - // create the user's first api key - let rpc_secret_key = RpcSecretKey::new(); - - let user_rpc_key = rpc_key::ActiveModel { - user_id: sea_orm::Set(caller.id), - secret_key: sea_orm::Set(rpc_secret_key.into()), - description: sea_orm::Set(None), - ..Default::default() - }; - - let user_rpc_key = user_rpc_key - .insert(&txn) - .await - .web3_context("Failed saving new user key")?; - - // We should also create the balance entry ... - let user_balance = balance::ActiveModel { - user_id: sea_orm::Set(caller.id), - ..Default::default() - }; - user_balance.insert(&txn).await?; - - let user_rpc_keys = vec![user_rpc_key]; - - // Also add a part for the invite code, i.e. who invited this guy - - // save the user and key to the database - txn.commit().await?; + let (caller, caller_key, _) = + register_new_user(&db_conn, our_msg.address.into()).await?; let txn = db_conn.begin().await?; // First, optionally catch a referral code from the parameters if there is any @@ -336,7 +342,7 @@ pub async fn user_login_post( } txn.commit().await?; - (caller, user_rpc_keys, StatusCode::CREATED) + (caller, vec![caller_key], StatusCode::CREATED) } Some(caller) => { // Let's say that a user that exists can actually also redeem a key in retrospect... diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 60bb7e2d..cd5a9a33 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,5 +1,7 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; +use crate::frontend::authorization::login_is_authorized; +use crate::frontend::users::authentication::register_new_user; use anyhow::Context; use axum::{ extract::Path, @@ -7,13 +9,13 @@ use axum::{ response::IntoResponse, Extension, Json, TypedHeader, }; +use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user}; use ethbloom::Input as BloomInput; use ethers::abi::AbiEncode; use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256}; use hashbrown::HashMap; -// use http::StatusCode; use http::StatusCode; use log::{debug, info, trace}; use migration::sea_orm::prelude::Decimal; @@ -21,7 +23,6 @@ use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait, }; use migration::{Expr, OnConflict}; -use num_traits::Pow; use payment_contracts::ierc20::IERC20; use payment_contracts::payment_factory::{self, PaymentFactory}; use serde_json::json; @@ -107,13 +108,13 @@ pub async fn user_deposits_get( #[debug_handler] pub async fn user_balance_post( Extension(app): Extension>, - TypedHeader(Authorization(bearer)): TypedHeader>, + InsecureClientIp(ip): InsecureClientIp, Path(mut params): Path>, ) -> Web3ProxyResponse { // I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap" - // Check that the user is logged-in and authorized - // The semaphore keeps a user from submitting tons of transactions in parallel which would DOS our backends - let (_, _semaphore) = app.bearer_is_authorized(bearer).await?; + // we rate limit by ip instead of bearer token so transactions are easy to submit from scripts + // TODO: if ip is a 10. or a 172., allow unlimited + login_is_authorized(&app, ip).await?; // Get the transaction hash, and the amount that the user wants to top up by. // Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg) @@ -169,12 +170,13 @@ pub async fn user_balance_post( PaymentFactory::new(payment_factory_address, app.internal_provider().clone()); debug!( - "Payment Factor Filter is: {:?}", + "Payment Factory Filter: {:?}", payment_factory_contract.payment_received_filter() ); // check bloom filter to be sure this transaction contains any relevant logs // TODO: This does not work properly right now, get back this eventually + // TODO: compare to code in llamanodes/web3-this-then-that // if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract // .payment_received_filter() // .filter @@ -241,11 +243,8 @@ pub async fn user_balance_post( payment_token_amount.set_scale(payment_token_decimals)?; info!( - "Found deposit transaction for: {:?} {:?} {:?} {:?}", - &recipient_account.to_fixed_bytes(), - recipient_account, - payment_token_address, - payment_token_amount + "Found deposit transaction for: {:?} {:?} {:?}", + recipient_account, payment_token_address, payment_token_amount ); let recipient = match user::Entity::find() @@ -253,12 +252,13 @@ pub async fn user_balance_post( .one(&db_conn) .await? { - Some(x) => Ok(x), + Some(x) => x, None => { - // todo!("make their account"); - Err(Web3ProxyError::AccessDenied) + let (user, _, _) = register_new_user(&db_conn, recipient_account).await?; + + user } - }?; + }; // For now we only accept stablecoins // And we hardcode the peg (later we would have to depeg this, for example diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 9b67c508..42fcc2ae 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -211,7 +211,7 @@ impl From<&'static str> for JsonRpcErrorData { fn from(value: &'static str) -> Self { Self { code: -32000, - message: Cow::Borrowed(value), + message: value.into(), data: None, } } @@ -221,7 +221,7 @@ impl From for JsonRpcErrorData { fn from(value: String) -> Self { Self { code: -32000, - message: Cow::Owned(value), + message: value.into(), data: None, } } @@ -273,7 +273,7 @@ impl JsonRpcForwardedResponse { result: None, error: Some(JsonRpcErrorData { code: code.unwrap_or(-32099), - message: Cow::Owned(message), + message: message.into(), // TODO: accept data as an argument data: None, }), @@ -319,10 +319,7 @@ impl JsonRpcForwardedResponse { data = err.data.clone(); } else if let Some(err) = err.as_serde_error() { // this is not an rpc error. keep it as an error - return Err(Web3ProxyError::BadResponse(format!( - "bad response: {}", - err - ))); + return Err(Web3ProxyError::BadResponse(err.to_string().into())); } else { return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into()); } @@ -336,7 +333,7 @@ impl JsonRpcForwardedResponse { result: None, error: Some(JsonRpcErrorData { code, - message: Cow::Owned(message), + message: message.into(), data, }), }) diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index f9541672..3e68267c 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -5,7 +5,6 @@ use hashbrown::hash_map::DefaultHashBuilder; use moka::future::Cache; use serde_json::value::RawValue; use std::{ - borrow::Cow, hash::{BuildHasher, Hash, Hasher}, sync::Arc, }; @@ -211,10 +210,7 @@ impl TryFrom for JsonRpcErrorData { data = err.data.clone(); } else if let Some(err) = err.as_serde_error() { // this is not an rpc error. keep it as an error - return Err(Web3ProxyError::BadResponse(format!( - "bad response: {}", - err - ))); + return Err(Web3ProxyError::BadResponse(err.to_string().into())); } else { return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into()); } @@ -224,7 +220,7 @@ impl TryFrom for JsonRpcErrorData { Ok(JsonRpcErrorData { code, - message: Cow::Owned(message), + message: message.into(), data, }) } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f1fe2e80..d8fd4255 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -403,9 +403,11 @@ impl Web3Rpcs { let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.load().len(); + let new_consensus_rpcs = Arc::new(new_consensus_rpcs); + let old_consensus_head_connections = self .watch_consensus_rpcs_sender - .send_replace(Some(Arc::new(new_consensus_rpcs))); + .send_replace(Some(new_consensus_rpcs.clone())); let backups_voted_str = if backups_needed { "B " } else { "" }; @@ -494,7 +496,7 @@ impl Web3Rpcs { } Ordering::Less => { // this is unlikely but possible - // TODO: better log + // TODO: better log that includes all the votes warn!( "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 4d54613f..ff9d2073 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -3,27 +3,31 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::Authorization; +use anyhow::Context; +use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; +use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; +use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn}; +use log::{log_enabled, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::fmt; -use std::sync::Arc; +use std::sync::{atomic, Arc}; use tokio::time::Instant; #[derive(Clone, Serialize)] -struct RpcData { +struct ConsensusRpcData { head_block_num: U64, // TODO: this is too simple. erigon has 4 prune levels (hrct) oldest_block_num: U64, } -impl RpcData { +impl ConsensusRpcData { fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self { let head_block_num = *head.number(); @@ -45,13 +49,13 @@ impl RpcData { #[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct RpcRanking { - tier: u64, + tier: u8, backup: bool, head_num: Option, } impl RpcRanking { - pub fn add_offset(&self, offset: u64) -> Self { + pub fn add_offset(&self, offset: u8) -> Self { Self { tier: self.tier + offset, backup: self.backup, @@ -66,9 +70,12 @@ impl RpcRanking { } } - fn sort_key(&self) -> (u64, bool, Reverse>) { + fn sort_key(&self) -> (bool, u8, Reverse>) { // TODO: add soft_limit here? add peak_ewma here? - (self.tier, !self.backup, Reverse(self.head_num)) + // TODO: should backup or tier be checked first? now that tiers are automated, backups + // TODO: should we include a random number in here? + // TODO: should we include peak_ewma_latency or weighted_peak_ewma_latency? + (!self.backup, self.tier, Reverse(self.head_num)) } } @@ -94,9 +101,10 @@ pub enum ShouldWaitForBlock { /// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our /status endpoint +/// TODO: one data structure of head_rpcs and other_rpcs that is sorted best first #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - pub(crate) tier: u64, + pub(crate) tier: u8, pub(crate) backups_needed: bool, // TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize @@ -112,7 +120,7 @@ pub struct ConsensusWeb3Rpcs { // TODO: make this work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] - rpc_data: HashMap, RpcData>, + rpc_data: HashMap, ConsensusRpcData>, } impl ConsensusWeb3Rpcs { @@ -332,26 +340,20 @@ type FirstSeenCache = Cache; /// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers pub struct ConsensusFinder { - /// backups for all tiers are only used if necessary - /// `tiers[0] = only tier 0` - /// `tiers[1] = tier 0 and tier 1` - /// `tiers[n] = tier 0..=n` rpc_heads: HashMap, Web3ProxyBlock>, /// never serve blocks that are too old max_block_age: Option, /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag max_block_lag: Option, - /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups + /// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } impl ConsensusFinder { pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { // TODO: what's a good capacity for this? it shouldn't need to be very large - // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache let first_seen = Cache::new(16); - // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading let rpc_heads = HashMap::new(); Self { @@ -433,11 +435,99 @@ impl ConsensusFinder { Ok(changed) } + pub async fn update_tiers(&mut self) -> Web3ProxyResult<()> { + match self.rpc_heads.len() { + 0 => {} + 1 => { + for rpc in self.rpc_heads.keys() { + rpc.tier.store(0, atomic::Ordering::Relaxed) + } + } + _ => { + // iterate first to find bounds + let mut min_latency = u64::MAX; + let mut max_latency = u64::MIN; + let mut weighted_latencies = HashMap::new(); + for rpc in self.rpc_heads.keys() { + let weighted_latency_seconds = rpc.weighted_peak_ewma_seconds(); + + let weighted_latency_ms = (weighted_latency_seconds * 1000.0).round() as i64; + + let weighted_latency_ms: u64 = weighted_latency_ms + .try_into() + .context("weighted_latency_ms does not fit in a u64")?; + + min_latency = min_latency.min(weighted_latency_ms); + max_latency = min_latency.max(weighted_latency_ms); + + weighted_latencies.insert(rpc, weighted_latency_ms); + } + + // // histogram requires high to be at least 2 x low + // // using min_latency for low does not work how we want it though + // max_latency = max_latency.max(2 * min_latency); + + // create the histogram + let mut hist = Histogram::::new_with_bounds(1, max_latency, 3).unwrap(); + + // TODO: resize shouldn't be necessary, but i've seen it error + hist.auto(true); + + for weighted_latency_ms in weighted_latencies.values() { + hist.record(*weighted_latency_ms)?; + } + + // dev logging + if log_enabled!(Level::Trace) { + // print the histogram. see docs/histograms.txt for more info + let mut encoder = + base64::write::EncoderWriter::new(Vec::new(), &general_purpose::STANDARD); + + V2DeflateSerializer::new() + .serialize(&hist, &mut encoder) + .unwrap(); + + let encoded = encoder.finish().unwrap(); + + let encoded = String::from_utf8(encoded).unwrap(); + + trace!("weighted_latencies: {}", encoded); + } + + // TODO: get someone who is better at math to do something smarter + // this is not a very good use of stddev, but it works for now + let stddev = hist.stdev(); + + for (rpc, weighted_latency_ms) in weighted_latencies.into_iter() { + let tier = (weighted_latency_ms - min_latency) as f64 / stddev; + + let tier = tier.floor() as u64; + + let tier = tier.clamp(u8::MIN.into(), u8::MAX.into()) as u8; + + // TODO: this should be trace + trace!( + "{} - weighted_latency: {}ms, tier {}", + rpc, + weighted_latency_ms, + tier + ); + + rpc.tier.store(tier, atomic::Ordering::Relaxed); + } + } + } + + Ok(()) + } + pub async fn find_consensus_connections( &mut self, authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> Web3ProxyResult> { + self.update_tiers().await?; + let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); let (lowest_block, highest_block) = match minmax_block { @@ -459,11 +549,12 @@ impl ConsensusFinder { let lowest_block_number = lowest_block.number().max(&max_lag_block_number); + // TODO: should lowest block number be set such that the rpc won't ever go backwards? trace!("safe lowest_block_number: {}", lowest_block_number); let num_known = self.rpc_heads.len(); - if num_known < web3_rpcs.min_head_rpcs { + if num_known < web3_rpcs.min_synced_rpcs { // this keeps us from serving requests when the proxy first starts trace!("not enough servers known"); return Ok(None); @@ -477,13 +568,14 @@ impl ConsensusFinder { let mut backup_consensus = None; let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); - rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); + rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier.load(atomic::Ordering::Relaxed)); let current_tier = rpc_heads_by_tier .first() .expect("rpc_heads_by_tier should never be empty") .0 - .tier; + .tier + .load(atomic::Ordering::Relaxed); // trace!("first_tier: {}", current_tier); @@ -492,7 +584,9 @@ impl ConsensusFinder { // loop over all the rpc heads (grouped by tier) and their parents to find consensus // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() { - if current_tier != rpc.tier { + let rpc_tier = rpc.tier.load(atomic::Ordering::Relaxed); + + if current_tier != rpc_tier { // we finished processing a tier. check for primary results if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { trace!("found enough votes on tier {}", current_tier); @@ -581,7 +675,7 @@ impl ConsensusFinder { continue; } // TODO: different mins for backup vs primary - if rpc_names.len() < web3_rpcs.min_head_rpcs { + if rpc_names.len() < web3_rpcs.min_synced_rpcs { continue; } @@ -593,14 +687,14 @@ impl ConsensusFinder { .filter_map(|x| web3_rpcs.get(x)) .collect(); - if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { + if consensus_rpcs.len() < web3_rpcs.min_synced_rpcs { continue; } // consensus found! let tier = consensus_rpcs .iter() - .map(|x| x.tier) + .map(|x| x.tier.load(atomic::Ordering::Relaxed)) .max() .expect("there should always be a max"); @@ -615,7 +709,11 @@ impl ConsensusFinder { { let x_head_num = *x_head.number(); - let key: RpcRanking = RpcRanking::new(x.tier, x.backup, Some(x_head_num)); + let key: RpcRanking = RpcRanking::new( + x.tier.load(atomic::Ordering::Relaxed), + x.backup, + Some(x_head_num), + ); other_rpcs .entry(key) @@ -627,7 +725,7 @@ impl ConsensusFinder { let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len()); for (x, x_head) in self.rpc_heads.iter() { - let y = RpcData::new(x, x_head); + let y = ConsensusRpcData::new(x, x_head); rpc_data.insert(x.clone(), y); } @@ -647,8 +745,11 @@ impl ConsensusFinder { None } - pub fn worst_tier(&self) -> Option { - self.rpc_heads.iter().map(|(x, _)| x.tier).max() + pub fn worst_tier(&self) -> Option { + self.rpc_heads + .iter() + .map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed)) + .max() } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index d6e8c030..f47da3a7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -23,17 +23,14 @@ use itertools::Itertools; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, CacheBuilder}; -use ordered_float::OrderedFloat; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; -use std::borrow::Cow; -use std::cmp::{min_by_key, Reverse}; +use std::cmp::min_by_key; use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; -use thread_fast_rng::rand::seq::SliceRandom; use tokio::select; use tokio::sync::{broadcast, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; @@ -63,7 +60,7 @@ pub struct Web3Rpcs { /// blocks on the heaviest chain pub(super) blocks_by_number: BlocksByNumberCache, /// the number of rpcs required to agree on consensus for the head block (thundering herd protection) - pub(super) min_head_rpcs: usize, + pub(super) min_synced_rpcs: usize, /// the soft limit required to agree on consensus for the head block. (thundering herd protection) pub(super) min_sum_soft_limit: u32, /// how far behind the highest known block height we can be before we stop serving requests @@ -123,7 +120,7 @@ impl Web3Rpcs { by_name, max_block_age, max_block_lag, - min_head_rpcs, + min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, pending_transaction_cache, @@ -256,10 +253,10 @@ impl Web3Rpcs { let num_rpcs = self.by_name.load().len(); - if num_rpcs < self.min_head_rpcs { + if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { num_known: num_rpcs, - min_head_rpcs: self.min_head_rpcs, + min_head_rpcs: self.min_synced_rpcs, }); } @@ -279,7 +276,7 @@ impl Web3Rpcs { } pub fn min_head_rpcs(&self) -> usize { - self.min_head_rpcs + self.min_synced_rpcs } /// subscribe to blocks and transactions from all the backend rpcs. @@ -440,7 +437,7 @@ impl Web3Rpcs { trace!("{} vs {}", rpc_a, rpc_b); // TODO: cached key to save a read lock // TODO: ties to the server with the smallest block_data_limit - let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.peak_ewma()); + let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_ewma_seconds()); trace!("winner: {}", faster_rpc); // add to the skip list in case this one fails @@ -523,8 +520,8 @@ impl Web3Rpcs { .cloned() .collect(); - // TODO: include tiers in this? - potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); + potential_rpcs + .sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed.copied())); match self ._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs) @@ -572,10 +569,11 @@ impl Web3Rpcs { .cloned(), ); - potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); - - if potential_rpcs.len() >= self.min_head_rpcs { + if potential_rpcs.len() >= self.min_synced_rpcs { // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); match self ._best_available_rpc( @@ -604,8 +602,7 @@ impl Web3Rpcs { } for next_rpcs in consensus_rpcs.other_rpcs.values() { - // we have to collect in order to shuffle - let mut more_rpcs: Vec<_> = next_rpcs + let more_rpcs = next_rpcs .iter() .filter(|rpc| { consensus_rpcs.rpc_will_work_now( @@ -615,16 +612,16 @@ impl Web3Rpcs { rpc, ) }) - .cloned() - .collect(); + .cloned(); - // shuffle only the new entries. that way the highest tier still gets preference - more_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); + potential_rpcs.extend(more_rpcs); - potential_rpcs.extend(more_rpcs.into_iter()); - - if potential_rpcs.len() >= self.min_head_rpcs { + if potential_rpcs.len() >= self.min_synced_rpcs { // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + match self ._best_available_rpc( &authorization, @@ -654,6 +651,10 @@ impl Web3Rpcs { if !potential_rpcs.is_empty() { // even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + match self ._best_available_rpc( &authorization, @@ -760,14 +761,14 @@ impl Web3Rpcs { }; // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + synced_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("synced_rpcs: {:#?}", synced_rpcs); // if there aren't enough synced connections, include more connections // TODO: only do this sorting if the synced_rpcs isn't enough let mut all_rpcs: Vec<_> = self.by_name.load().values().cloned().collect(); - all_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("all_rpcs: {:#?}", all_rpcs); @@ -1077,7 +1078,7 @@ impl Web3Rpcs { // TODO: what error code? // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} Err(JsonRpcErrorData { - message: Cow::Borrowed("Requested data is not available"), + message: "Requested data is not available".into(), code: -32043, data: None, } @@ -1284,26 +1285,6 @@ impl Serialize for Web3Rpcs { } } -/// sort by block number (descending) and tier (ascending) -/// TODO: should this be moved into a `impl Web3Rpc`? -/// TODO: i think we still have sorts scattered around the code that should use this -/// TODO: take AsRef or something like that? We don't need an Arc here -fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { - let head_block = x - .head_block - .as_ref() - .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) - .unwrap_or_default(); - - let tier = x.tier; - - let peak_ewma = x.peak_ewma(); - - let backup = x.backup; - - (Reverse(head_block), tier, backup, peak_ewma) -} - mod tests { #![allow(unused_imports)] @@ -1326,8 +1307,14 @@ mod tests { PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn test_sort_connections_by_sync_status() { + let _ = env_logger::builder() + .filter_level(LevelFilter::Error) + .filter_module("web3_proxy", LevelFilter::Trace) + .is_test(true) + .try_init(); + let block_0 = Block { number: Some(0.into()), hash: Some(H256::random()), @@ -1361,42 +1348,42 @@ mod tests { let mut rpcs: Vec<_> = [ Web3Rpc { name: "a".to_string(), - tier: 0, + tier: 0.into(), head_block: Some(tx_a), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), - tier: 0, + tier: 0.into(), head_block: Some(tx_b), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), - tier: 0, + tier: 0.into(), head_block: Some(tx_c), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), - tier: 1, + tier: 1.into(), head_block: Some(tx_d), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), - tier: 1, + tier: 1.into(), head_block: Some(tx_e), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), - tier: 1, + tier: 1.into(), head_block: Some(tx_f), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1406,11 +1393,11 @@ mod tests { .map(Arc::new) .collect(); - rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(None)); let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); - assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]); + assert_eq!(names_in_sort_order, ["c", "b", "a", "f", "e", "d"]); } #[tokio::test] @@ -1452,7 +1439,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, + // tier: 0, head_block: Some(tx_synced), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1466,7 +1453,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, + // tier: 0, head_block: Some(tx_lagged), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1513,7 +1500,7 @@ mod tests { max_block_age: None, // TODO: test max_block_lag? max_block_lag: None, - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 1, }; @@ -1736,7 +1723,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: 64.into(), - tier: 1, + // tier: 1, head_block: Some(tx_pruned), ..Default::default() }; @@ -1749,7 +1736,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: u64::MAX.into(), - tier: 2, + // tier: 2, head_block: Some(tx_archive), ..Default::default() }; @@ -1789,7 +1776,7 @@ mod tests { blocks_by_number: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) .build(), - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, max_block_lag: None, @@ -1918,7 +1905,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: 64.into(), - tier: 0, + // tier: 0, head_block: Some(tx_mock_geth), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1930,7 +1917,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: u64::MAX.into(), - tier: 1, + // tier: 1, head_block: Some(tx_mock_erigon_archive), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1969,7 +1956,7 @@ mod tests { pending_tx_id_sender, blocks_by_hash: Cache::new(10_000), blocks_by_number: Cache::new(10_000), - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None, max_block_lag: None, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index fcd8cdab..dc07a732 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -22,10 +22,12 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; +use std::cmp::Reverse; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; +use thread_fast_rng::rand::Rng; use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use url::Url; @@ -54,8 +56,6 @@ pub struct Web3Rpc { pub backup: bool, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, - /// Lower tiers are higher priority when sending requests - pub(super) tier: u64, /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) head_block: Option>>, @@ -64,6 +64,8 @@ pub struct Web3Rpc { /// Track peak request latency /// This is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, + /// Automatically set priority + pub(super) tier: AtomicU8, /// Track total requests served /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, @@ -196,7 +198,6 @@ impl Web3Rpc { name, peak_latency: Some(peak_latency), soft_limit: config.soft_limit, - tier: config.tier, ws_provider, disconnect_watch: Some(disconnect_watch), ..Default::default() @@ -219,7 +220,61 @@ impl Web3Rpc { Ok((new_connection, handle)) } - pub fn peak_ewma(&self) -> OrderedFloat { + /// sort by... + /// - backups last + /// - tier (ascending) + /// - block number (descending) + /// TODO: tests on this! + /// TODO: should tier or block number take priority? + /// TODO: should this return a struct that implements sorting traits? + fn sort_on(&self, max_block: Option) -> (bool, u8, Reverse) { + let mut head_block = self + .head_block + .as_ref() + .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) + .unwrap_or_default(); + + if let Some(max_block) = max_block { + head_block = head_block.min(max_block); + } + + let tier = self.tier.load(atomic::Ordering::Relaxed); + + let backup = self.backup; + + (!backup, tier, Reverse(head_block)) + } + + pub fn sort_for_load_balancing_on( + &self, + max_block: Option, + ) -> ((bool, u8, Reverse), OrderedFloat) { + let sort_on = self.sort_on(max_block); + + let weighted_peak_ewma_seconds = self.weighted_peak_ewma_seconds(); + + let x = (sort_on, weighted_peak_ewma_seconds); + + trace!("sort_for_load_balancing {}: {:?}", self, x); + + x + } + + /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_ewma_seconds + pub fn shuffle_for_load_balancing_on( + &self, + max_block: Option, + ) -> ((bool, u8, Reverse), u32) { + let sort_on = self.sort_on(max_block); + + let mut rng = thread_fast_rng::thread_fast_rng(); + + let r = rng.gen::(); + + (sort_on, r) + } + + pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat { let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { peak_latency.latency().as_secs_f64() } else { @@ -538,7 +593,7 @@ impl Web3Rpc { // TODO: how often? different depending on the chain? // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though - let health_sleep_seconds = 10; + let health_sleep_seconds = 5; // health check loop let f = async move { @@ -550,7 +605,7 @@ impl Web3Rpc { while !rpc.should_disconnect() { new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); - if new_total_requests - old_total_requests < 10 { + if new_total_requests - old_total_requests < 5 { // TODO: if this fails too many times, reset the connection // TODO: move this into a function and the chaining should be easier if let Err(err) = rpc.healthcheck(error_handler).await { @@ -900,18 +955,20 @@ impl Web3Rpc { impl Hash for Web3Rpc { fn hash(&self, state: &mut H) { - self.name.hash(state); - self.display_name.hash(state); - self.http_provider.as_ref().map(|x| x.url()).hash(state); - // TODO: figure out how to get the url for the provider - // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else - // self.ws_provider.map(|x| x.url()).hash(state); - self.automatic_block_limit.hash(state); + // do not include automatic block limit because it can change + // do not include tier because it can change self.backup.hash(state); + self.created_at.hash(state); + self.display_name.hash(state); + self.name.hash(state); + + // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else + self.http_provider.as_ref().map(|x| x.url()).hash(state); + // TODO: figure out how to get the url for the ws provider + // self.ws_provider.map(|x| x.url()).hash(state); + // TODO: don't include soft_limit if we change them to be dynamic self.soft_limit.hash(state); - self.tier.hash(state); - self.created_at.hash(state); } } @@ -988,7 +1045,7 @@ impl Serialize for Web3Rpc { &self.peak_latency.as_ref().unwrap().latency().as_millis(), )?; - state.serialize_field("peak_ewma_s", self.peak_ewma().as_ref())?; + state.serialize_field("peak_ewma_s", self.weighted_peak_ewma_seconds().as_ref())?; state.end() } @@ -1047,7 +1104,6 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, head_block: Some(tx), ..Default::default() }; @@ -1082,7 +1138,6 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, head_block: Some(tx), ..Default::default() }; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 97dff7be..dd0dc37b 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -228,7 +228,10 @@ pub async fn query_user_stats<'a>( let raw_influx_responses: Vec = influxdb_client .query_raw(Some(query.clone())) .await - .context("failed parsing query result into a FluxRecord")?; + .context(format!( + "failed parsing query result into a FluxRecord. Query={:?}", + query + ))?; // Basically rename all items to be "total", // calculate number of "archive_needed" and "error_responses" through their boolean representations ... @@ -237,30 +240,28 @@ pub async fn query_user_stats<'a>( // TODO: I must be able to probably zip the balance query... let datapoints = raw_influx_responses .into_iter() - // .into_values() .map(|x| x.values) .map(|value_map| { // Unwrap all relevant numbers - // BTreeMap - let mut out: HashMap = HashMap::new(); + let mut out: HashMap<&str, serde_json::Value> = HashMap::new(); value_map.into_iter().for_each(|(key, value)| { if key == "_measurement" { match value { influxdb2_structmap::value::Value::String(inner) => { if inner == "opt_in_proxy" { out.insert( - "collection".to_owned(), + "collection", serde_json::Value::String("opt-in".to_owned()), ); } else if inner == "global_proxy" { out.insert( - "collection".to_owned(), + "collection", serde_json::Value::String("global".to_owned()), ); } else { warn!("Some datapoints are not part of any _measurement!"); out.insert( - "collection".to_owned(), + "collection", serde_json::Value::String("unknown".to_owned()), ); } @@ -272,10 +273,7 @@ pub async fn query_user_stats<'a>( } else if key == "_stop" { match value { influxdb2_structmap::value::Value::TimeRFC(inner) => { - out.insert( - "stop_time".to_owned(), - serde_json::Value::String(inner.to_string()), - ); + out.insert("stop_time", serde_json::Value::String(inner.to_string())); } _ => { error!("_stop should always be a TimeRFC!"); @@ -284,10 +282,7 @@ pub async fn query_user_stats<'a>( } else if key == "_time" { match value { influxdb2_structmap::value::Value::TimeRFC(inner) => { - out.insert( - "time".to_owned(), - serde_json::Value::String(inner.to_string()), - ); + out.insert("time", serde_json::Value::String(inner.to_string())); } _ => { error!("_stop should always be a TimeRFC!"); @@ -297,7 +292,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_backend_requests".to_owned(), + "total_backend_requests", serde_json::Value::Number(inner.into()), ); } @@ -308,7 +303,7 @@ pub async fn query_user_stats<'a>( } else if key == "balance" { match value { influxdb2_structmap::value::Value::Double(inner) => { - out.insert("balance".to_owned(), json!(f64::from(inner))); + out.insert("balance", json!(f64::from(inner))); } _ => { error!("balance should always be a Double!"); @@ -317,10 +312,7 @@ pub async fn query_user_stats<'a>( } else if key == "cache_hits" { match value { influxdb2_structmap::value::Value::Long(inner) => { - out.insert( - "total_cache_hits".to_owned(), - serde_json::Value::Number(inner.into()), - ); + out.insert("total_cache_hits", serde_json::Value::Number(inner.into())); } _ => { error!("cache_hits should always be a Long!"); @@ -330,7 +322,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_cache_misses".to_owned(), + "total_cache_misses", serde_json::Value::Number(inner.into()), ); } @@ -342,7 +334,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_frontend_requests".to_owned(), + "total_frontend_requests", serde_json::Value::Number(inner.into()), ); } @@ -353,10 +345,7 @@ pub async fn query_user_stats<'a>( } else if key == "no_servers" { match value { influxdb2_structmap::value::Value::Long(inner) => { - out.insert( - "no_servers".to_owned(), - serde_json::Value::Number(inner.into()), - ); + out.insert("no_servers", serde_json::Value::Number(inner.into())); } _ => { error!("no_servers should always be a Long!"); @@ -365,7 +354,7 @@ pub async fn query_user_stats<'a>( } else if key == "sum_credits_used" { match value { influxdb2_structmap::value::Value::Double(inner) => { - out.insert("total_credits_used".to_owned(), json!(f64::from(inner))); + out.insert("total_credits_used", json!(f64::from(inner))); } _ => { error!("sum_credits_used should always be a Double!"); @@ -375,7 +364,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_request_bytes".to_owned(), + "total_request_bytes", serde_json::Value::Number(inner.into()), ); } @@ -387,7 +376,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_response_bytes".to_owned(), + "total_response_bytes", serde_json::Value::Number(inner.into()), ); } @@ -399,7 +388,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::String(inner) => { out.insert( - "rpc_key".to_owned(), + "rpc_key", serde_json::Value::String( rpc_key_id_to_key.get(&inner).unwrap().to_string(), ), @@ -413,7 +402,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_response_millis".to_owned(), + "total_response_millis", serde_json::Value::Number(inner.into()), ); } @@ -426,7 +415,7 @@ pub async fn query_user_stats<'a>( else if stat_response_type == StatType::Detailed && key == "method" { match value { influxdb2_structmap::value::Value::String(inner) => { - out.insert("method".to_owned(), serde_json::Value::String(inner)); + out.insert("method", serde_json::Value::String(inner)); } _ => { error!("method should always be a String!"); @@ -435,7 +424,7 @@ pub async fn query_user_stats<'a>( } else if key == "chain_id" { match value { influxdb2_structmap::value::Value::String(inner) => { - out.insert("chain_id".to_owned(), serde_json::Value::String(inner)); + out.insert("chain_id", serde_json::Value::String(inner)); } _ => { error!("chain_id should always be a String!"); @@ -445,7 +434,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::String(inner) => { out.insert( - "archive_needed".to_owned(), + "archive_needed", if inner == "true" { serde_json::Value::Bool(true) } else if inner == "false" { @@ -463,7 +452,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::String(inner) => { out.insert( - "error_response".to_owned(), + "error_response", if inner == "true" { serde_json::Value::Bool(true) } else if inner == "false" { @@ -515,9 +504,6 @@ pub async fn query_user_stats<'a>( } let response = Json(json!(response_body)).into_response(); - // Add the requests back into out - - // TODO: Now impplement the proper response type Ok(response) } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 6409aead..8b1740f2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -605,8 +605,7 @@ impl BufferedRpcQueryStats { // ================== let sender_latest_balance = match NonZeroU64::try_from(sender_rpc_entity.user_id) { Err(_) => Err(Web3ProxyError::BadResponse( - "Balance is not positive, although it was previously checked to be as such!" - .to_string(), + "Balance is not positive, although it was previously checked to be as such!".into(), )), // We don't do an get_or_insert, because technically we don't have the most up to date balance // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it