From cae034afb325f5b4a779ce2b83c413c05ef8b895 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 8 Sep 2022 21:01:36 +0000 Subject: [PATCH] better metrics and spawn --- Cargo.lock | 239 +++++++++++++++++----- web3_proxy/Cargo.toml | 5 +- web3_proxy/examples/metrics.rs | 31 +++ web3_proxy/src/app.rs | 60 ++++-- web3_proxy/src/bin/curve_api_checks.rs | 7 + web3_proxy/src/bin/web3_proxy.rs | 13 +- web3_proxy/src/config.rs | 2 + web3_proxy/src/frontend/errors.rs | 2 + web3_proxy/src/frontend/http.rs | 33 +-- web3_proxy/src/frontend/rate_limit.rs | 7 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 85 ++++---- web3_proxy/src/frontend/rpc_proxy_ws.rs | 9 +- web3_proxy/src/lib.rs | 2 +- web3_proxy/src/metrics.rs | 58 ++++++ web3_proxy/src/rpcs/blockchain.rs | 4 +- web3_proxy/src/rpcs/request.rs | 9 +- web3_proxy/src/stats.rs | 115 ----------- 17 files changed, 408 insertions(+), 273 deletions(-) create mode 100644 web3_proxy/examples/metrics.rs create mode 100644 web3_proxy/src/bin/curve_api_checks.rs create mode 100644 web3_proxy/src/metrics.rs delete mode 100644 web3_proxy/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index fb35b64c..d4615872 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,28 @@ dependencies = [ "term", ] +[[package]] +name = "aspect" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3927b415bba088539aaaf872d19752c7d00101a25ead1d123fcd7633f9c224d" +dependencies = [ + "aspect-weave", +] + +[[package]] +name = "aspect-weave" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea4f672ac5290272725e1453014af99a86d2c1712808d647f469bf9427519f41" +dependencies = [ + "indexmap", + "proc-macro2", + "quote", + "syn", + "synattra", +] + [[package]] name = "async-attributes" version = "1.1.2" @@ -307,6 +329,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c" +dependencies = [ + "autocfg 1.1.0", +] + [[package]] name = "atomic-waker" version = "1.0.0" @@ -324,6 +355,40 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto_enums" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0dfe45d75158751e195799f47ea02e81f570aa24bc5ef999cdd9e888c4b5c3" +dependencies = [ + "auto_enums_core", + "auto_enums_derive", +] + +[[package]] +name = "auto_enums_core" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da47c46001293a2c4b744d731958be22cff408a2ab76e2279328f9713b1267b4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "auto_enums_derive" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41aed1da83ecdc799503b7cb94da1b45a34d72b49caf40a61d9cf5b88ec07cfd" +dependencies = [ + "autocfg 1.1.0", + "derive_utils", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "auto_impl" version = "1.0.1" @@ -1213,6 +1278,17 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_utils" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532b4c15dccee12c7044f1fcad956e98410860b22231e44a3b827464797ca7bf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dialoguer" version = "0.8.0" @@ -1281,6 +1357,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dotenv" version = "0.15.0" @@ -1293,12 +1375,6 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" -[[package]] -name = "dtoa" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656" - [[package]] name = "dunce" version = "1.0.2" @@ -1823,11 +1899,10 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" dependencies = [ - "matches", "percent-encoding", ] @@ -1860,6 +1935,12 @@ dependencies = [ "libc", ] +[[package]] +name = "ftoa" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca45aac12b6c561b6289bc68957cb1db3dccf870e1951d590202de5e24f1dd35" + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -2142,6 +2223,20 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "hdrhistogram" +version = "7.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea9fe3952d32674a14e0975009a3547af9ea364995b5ec1add2e23c2ae523ab" +dependencies = [ + "base64 0.13.0", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.7" @@ -2310,11 +2405,10 @@ dependencies = [ [[package]] name = "idna" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" dependencies = [ - "matches", "unicode-bidi", "unicode-normalization", ] @@ -2371,6 +2465,7 @@ checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg 1.1.0", "hashbrown", + "serde", ] [[package]] @@ -2601,12 +2696,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matches" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" - [[package]] name = "matchit" version = "0.5.0" @@ -2637,6 +2726,36 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "metered" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17491527d2ceff20d00d02166bdd18e23056e7ced22b9a8bb0efdfd293f0441a" +dependencies = [ + "aspect", + "atomic", + "cfg-if", + "hdrhistogram", + "metered-macro", + "parking_lot 0.12.1", + "serde", +] + +[[package]] +name = "metered-macro" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5ef9d33baa693e2d449d069f6ef6eb549762ed0c0179976c45bd98f3aa4a4e1" +dependencies = [ + "aspect-weave", + "heck 0.4.0", + "indexmap", + "proc-macro2", + "quote", + "syn", + "synattra", +] + [[package]] name = "migration" version = "0.1.0" @@ -3079,9 +3198,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "pest" @@ -3399,29 +3518,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "prometheus-client" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c473049631c233933d6286c88bbb7be30e62ec534cf99a9ae0079211f7fa603" -dependencies = [ - "dtoa 1.0.3", - "itoa 1.0.2", - "parking_lot 0.12.1", - "prometheus-client-derive-text-encode", -] - -[[package]] -name = "prometheus-client-derive-text-encode" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a455fbcb954c1a7decf3c586e860fd7889cddf4b8e164be736dbac95a953cd" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pulldown-cmark" version = "0.9.2" @@ -3633,7 +3729,7 @@ dependencies = [ "async-trait", "bytes", "combine", - "dtoa 0.4.8", + "dtoa", "futures-util", "itoa 0.4.8", "percent-encoding", @@ -4177,6 +4273,21 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_prometheus" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25fcd6131bac47a32328d1ba1ee15a27f8d91ab2e5920dba71dbe93d2648f6b1" +dependencies = [ + "ftoa", + "indexmap", + "itoa 0.4.8", + "lazy_static", + "regex", + "serde", + "snafu", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4349,6 +4460,27 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snafu" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "socket2" version = "0.4.4" @@ -4624,6 +4756,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synattra" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "378cd5695f9ef5a26668bb70e81a464e7de6144bac3f77f42d5fa596c690be63" +dependencies = [ + "auto_enums", + "proc-macro2", + "syn", +] + [[package]] name = "sync_wrapper" version = "0.1.1" @@ -5167,13 +5310,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "url" -version = "2.2.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" dependencies = [ "form_urlencoded", "idna", - "matches", "percent-encoding", ] @@ -5378,6 +5520,7 @@ dependencies = [ "handlebars", "hashbrown", "http", + "metered", "migration", "moka", "notify", @@ -5385,7 +5528,6 @@ dependencies = [ "parking_lot 0.12.1", "petgraph", "proctitle", - "prometheus-client", "rand 0.8.5", "redis-rate-limit", "regex", @@ -5394,6 +5536,7 @@ dependencies = [ "sea-orm", "serde", "serde_json", + "serde_prometheus", "siwe", "time 0.3.14", "tokio", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 2bab0816..af2c0cf1 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -34,13 +34,13 @@ flume = "0.10.14" futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } http = "0.2.8" +metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.4", default-features = false, features = ["future"] } notify = "5.0.0" num = "0.4.0" parking_lot = { version = "0.12.1", features = ["arc_lock"] } petgraph = "0.6.2" proctitle = "0.1.1" -prometheus-client = "0.18.0" rand = "0.8.5" # TODO: regex has several "perf" features that we might want to use regex = "1.6.0" @@ -51,6 +51,7 @@ siwe = "0.4.2" sea-orm = { version = "0.9.2", features = ["macros"] } serde = { version = "1.0.144", features = [] } serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } +serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude time = "0.3.14" tokio = { version = "1.21.0", features = ["full", "tracing"] } @@ -64,5 +65,5 @@ tracing = "0.1.36" # TODO: tracing-subscriber has serde and serde_json features that we might want to use tracing-subscriber = { version = "0.3.15", features = ["env-filter", "parking_lot"] } ulid = { version = "1.0.0", features = ["serde"] } -url = "2.2.2" +url = "2.3.1" uuid = "1.1.2" diff --git a/web3_proxy/examples/metrics.rs b/web3_proxy/examples/metrics.rs new file mode 100644 index 00000000..74dbc95c --- /dev/null +++ b/web3_proxy/examples/metrics.rs @@ -0,0 +1,31 @@ +use metered::{metered, HitCount, Throughput}; +use serde::Serialize; + +#[derive(Default, Debug, Serialize)] +pub struct Biz { + metrics: BizMetrics, +} + +#[metered(registry = BizMetrics)] +impl Biz { + #[measure([HitCount, Throughput])] + pub fn biz(&self) { + let delay = std::time::Duration::from_millis(rand::random::() % 200); + std::thread::sleep(delay); + } +} + +fn main() { + let buz = Biz::default(); + + for _ in 0..100 { + buz.biz(); + } + + let mut globals = std::collections::HashMap::new(); + globals.insert("service", "web3_proxy_prometheus_example"); + + let serialized = serde_prometheus::to_string(&buz.metrics, Some("example"), globals).unwrap(); + + println!("{}", serialized); +} diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 3070a7ea..eda8912c 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -9,7 +9,6 @@ use crate::jsonrpc::JsonRpcRequestEnum; use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::transactions::TxStatus; -use crate::stats::AppStats; use anyhow::Context; use axum::extract::ws::Message; use derive_more::From; @@ -20,6 +19,8 @@ use futures::future::{join_all, AbortHandle}; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::Future; +use hashbrown::HashMap; +use metered::{metered, ErrorCount, HitCount, InFlight, ResponseTime, Throughput}; use migration::{Migrator, MigratorTrait}; use moka::future::Cache; use redis_rate_limit::bb8::PooledConnection; @@ -79,13 +80,14 @@ pub struct Web3ProxyApp { pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, + /// prometheus metrics + metrics: Arc, /// store pending queries so that we don't send the same request to our backends multiple times pub total_queries: AtomicU64, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_rate_limiter: Option, pub redis_pool: Option, - pub stats: AppStats, pub user_cache: Cache, } @@ -115,6 +117,7 @@ pub async fn flatten_handles( } /// Connect to the database and run migrations +#[instrument(skip_all)] pub async fn get_migrated_db( db_url: String, min_connections: u32, @@ -141,9 +144,9 @@ pub async fn get_migrated_db( Ok(db) } +#[metered(registry = Web3ProxyAppMetrics)] impl Web3ProxyApp { pub async fn spawn( - app_stats: AppStats, top_config: TopConfig, num_workers: u32, ) -> anyhow::Result<( @@ -321,7 +324,7 @@ impl Web3ProxyApp { frontend_rate_limiter, db_conn, redis_pool, - stats: app_stats, + metrics: Default::default(), user_cache, }; @@ -332,10 +335,22 @@ impl Web3ProxyApp { Ok((app, handle)) } - pub async fn eth_subscribe( - self: Arc, + pub fn prometheus_metrics(&self) -> anyhow::Result { + let globals = HashMap::new(); + // TODO: what globals? should this be the hostname or what? + // globals.insert("service", "web3_proxy"); + + let serialized = serde_prometheus::to_string(&self.metrics, Some("web3_proxy"), globals)?; + + Ok(serialized) + } + + #[instrument(skip_all)] + #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] + pub async fn eth_subscribe<'a>( + self: &'a Arc, payload: JsonRpcRequest, - subscription_count: &AtomicUsize, + subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now response_sender: flume::Sender, ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { @@ -524,7 +539,7 @@ impl Web3ProxyApp { /// send the request or batch of requests to the approriate RPCs #[instrument(skip_all)] pub async fn proxy_web3_rpc( - &self, + self: &Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level @@ -550,14 +565,14 @@ impl Web3ProxyApp { Ok(response) } + /// cut up the request and send to potentually different servers + /// TODO: make sure this isn't a problem + #[instrument(skip_all)] async fn proxy_web3_rpc_requests( - &self, + self: &Arc, requests: Vec, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly - // we cut up the request and send to potentually different servers. this could be a problem. - // if the client needs consistent blocks, they should specify instead of assume batches work on the same - // TODO: is spawning here actually slower? let num_requests = requests.len(); let responses = join_all( requests @@ -576,6 +591,7 @@ impl Web3ProxyApp { Ok(collected) } + #[instrument(skip_all)] pub async fn redis_conn(&self) -> anyhow::Result> { match self.redis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), @@ -587,8 +603,10 @@ impl Web3ProxyApp { } } + #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] + #[instrument(skip_all)] async fn proxy_web3_rpc_request( - &self, + self: &Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -609,7 +627,7 @@ impl Web3ProxyApp { // TODO: don't clone let partial_response: serde_json::Value = match request.method.clone().as_ref() { // lots of commands are blocked - "admin_addPeer" + method @ ("admin_addPeer" | "admin_datadir" | "admin_startRPC" | "admin_startWS" @@ -671,20 +689,20 @@ impl Web3ProxyApp { | "shh_newIdentity" | "shh_post" | "shh_uninstallFilter" - | "shh_version" => { + | "shh_version") => { // TODO: client error stat // TODO: proper error code - return Err(anyhow::anyhow!("unsupported")); + return Err(anyhow::anyhow!("method unsupported: {}", method)); } // TODO: implement these commands - "eth_getFilterChanges" + method @ ("eth_getFilterChanges" | "eth_getFilterLogs" | "eth_newBlockFilter" | "eth_newFilter" | "eth_newPendingTransactionFilter" - | "eth_uninstallFilter" => { + | "eth_uninstallFilter") => { // TODO: unsupported command stat - return Err(anyhow::anyhow!("not yet implemented")); + return Err(anyhow::anyhow!("not yet implemented: {}", method)); } // some commands can use local data or caches "eth_accounts" => { @@ -698,7 +716,9 @@ impl Web3ProxyApp { } None => { // TODO: what does geth do if this happens? - return Err(anyhow::anyhow!("no servers synced")); + return Err(anyhow::anyhow!( + "no servers synced. unknown eth_blockNumber" + )); } } } diff --git a/web3_proxy/src/bin/curve_api_checks.rs b/web3_proxy/src/bin/curve_api_checks.rs new file mode 100644 index 00000000..25f47375 --- /dev/null +++ b/web3_proxy/src/bin/curve_api_checks.rs @@ -0,0 +1,7 @@ +fn main() -> anyhow::Result<()> { + // get curve-api and rpc endpoints from cli flags + // do simple checks for proxy version, blockNum, a simple balance call + // in parallel check all the curve api endpoints that we expect to pass. many require multiple chains that we don't yet run locally + + Ok(()) +} diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 0cab134d..ea849cac 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -18,8 +18,7 @@ use tracing::{debug, info}; use tracing_subscriber::EnvFilter; use web3_proxy::app::{flatten_handle, Web3ProxyApp}; use web3_proxy::config::{CliConfig, TopConfig}; -use web3_proxy::frontend; -use web3_proxy::stats::AppStatsRegistry; +use web3_proxy::{frontend, metrics}; fn run( shutdown_receiver: flume::Receiver<()>, @@ -69,18 +68,14 @@ fn run( debug!(?num_workers); rt.block_on(async { - let app_stats_registry = AppStatsRegistry::new(); - - let app_stats = app_stats_registry.stats.clone(); - let app_frontend_port = cli_config.port; let app_prometheus_port = cli_config.prometheus_port; - let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config, num_workers).await?; + let (app, app_handle) = Web3ProxyApp::spawn(top_config, num_workers).await?; - let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app)); + let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone())); - let prometheus_handle = tokio::spawn(app_stats_registry.serve(app_prometheus_port)); + let prometheus_handle = tokio::spawn(metrics::serve(app, app_prometheus_port)); // if everything is working, these should both run forever // TODO: try_join these instead? use signal_shutdown here? diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index a3a713a5..03b23c7d 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -8,6 +8,7 @@ use hashbrown::HashMap; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; +use tracing::instrument; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -113,6 +114,7 @@ impl Web3ConnectionConfig { /// Create a Web3Connection from config /// TODO: move this into Web3Connection (just need to make things pub(crate)) #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] pub async fn spawn( self, name: String, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 674ae796..542c324e 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -9,6 +9,7 @@ use redis_rate_limit::{bb8::RunError, RedisError}; use sea_orm::DbErr; use serde_json::value::RawValue; use std::error::Error; +use tracing::instrument; // TODO: take "IntoResult" instead? pub type FrontendResult = Result; @@ -51,6 +52,7 @@ impl IntoResponse for FrontendErrorResponse { } } +#[instrument(skip_all)] pub async fn handler_404() -> Response { let err = anyhow::anyhow!("nothing to see here"); diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 1a909ac3..8ecb77fe 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -3,8 +3,10 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use moka::future::ConcurrentCacheExt; use serde_json::json; use std::sync::{atomic, Arc}; +use tracing::instrument; /// Health check page for load balancers to use +#[instrument(skip_all)] pub async fn health(Extension(app): Extension>) -> impl IntoResponse { if app.balanced_rpcs.synced() { (StatusCode::OK, "OK") @@ -15,20 +17,21 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// Very basic status page /// TODO: replace this with proper stats and monitoring +#[instrument(skip_all)] pub async fn status(Extension(app): Extension>) -> impl IntoResponse { - // TODO: what else should we include? uptime? - app.pending_transactions.sync(); - app.user_cache.sync(); - - let body = json!({ - "total_queries": app.total_queries.load(atomic::Ordering::Acquire), - "pending_transactions_count": app.pending_transactions.entry_count(), - "pending_transactions_size": app.pending_transactions.weighted_size(), - "user_cache_count": app.user_cache.entry_count(), - "user_cache_size": app.user_cache.weighted_size(), - "balanced_rpcs": app.balanced_rpcs, - "private_rpcs": app.private_rpcs, - }); - - Json(body) + // // TODO: what else should we include? uptime? + // app.pending_transactions.sync(); + // app.user_cache.sync(); + // let body = json!({ + // "total_queries": app.total_queries.load(atomic::Ordering::Acquire), + // "pending_transactions_count": app.pending_transactions.entry_count(), + // "pending_transactions_size": app.pending_transactions.weighted_size(), + // "user_cache_count": app.user_cache.entry_count(), + // "user_cache_size": app.user_cache.weighted_size(), + // "balanced_rpcs": app.balanced_rpcs, + // "private_rpcs": app.private_rpcs, + // }); + // Json(body) + // TODO: only expose this on a different port + app.prometheus_metrics().unwrap() } diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 593ba8ac..c1ee2645 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -137,7 +137,7 @@ impl Web3ProxyApp { } Ok(ThrottleResult::RetryNever) => { // TODO: prettier error for the user - return Err(anyhow::anyhow!("ip blocked by rate limiter")); + return Err(anyhow::anyhow!("ip ({}) blocked by rate limiter", ip)); } Err(err) => { // internal error, not rate limit being hit @@ -259,7 +259,10 @@ impl Web3ProxyApp { } Ok(ThrottleResult::RetryNever) => { // TODO: prettier error for the user - return Err(anyhow::anyhow!("user blocked by rate limiter")); + return Err(anyhow::anyhow!( + "user #{} blocked by rate limiter", + user_data.user_id + )); } Err(err) => { // internal error, not rate limit being hit diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 81628613..6e3bb79a 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,78 +1,67 @@ use super::errors::FrontendResult; use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key}; -use crate::stats::Protocol; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; -use axum::extract::Path; +use axum::extract::{Host, Path}; +use axum::headers::{Referer, UserAgent}; +use axum::TypedHeader; use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use std::sync::Arc; -use tracing::{error_span, Instrument}; +use tracing::{debug_span, error_span, Instrument}; use uuid::Uuid; pub async fn public_proxy_web3_rpc( - Json(payload): Json, Extension(app): Extension>, + Host(host): Host, ClientIp(ip): ClientIp, + Json(payload): Json, + referer: Option>, + user_agent: Option>, ) -> FrontendResult { - let _ip = rate_limit_by_ip(&app, ip).await?; + let request_span = debug_span!("request", host, ?referer, ?user_agent); - let protocol = Protocol::HTTP; - let user_id = 0; + let ip = rate_limit_by_ip(&app, ip) + .instrument(request_span.clone()) + .await?; - let user_span = error_span!("user", user_id, ?protocol); + let user_span = error_span!("ip", %ip); - /* - // TODO: move this to a helper function (or two). have it fetch method, protocol, etc. from tracing? - match &payload { - JsonRpcRequestEnum::Batch(batch) => { - // TODO: use inc_by if possible? need to group them by rpc_method - for single in batch { - let rpc_method = single.method.clone(); + let f = tokio::spawn(async move { + app.proxy_web3_rpc(payload) + .instrument(request_span) + .instrument(user_span) + .await + }); - let _count = app - .stats - .proxy_requests - .get_or_create(&ProxyRequestLabels { - rpc_method, - protocol: protocol.clone(), - user_id, - }) - .inc(); - } - } - JsonRpcRequestEnum::Single(single) => { - let rpc_method = single.method.clone(); - - let _count = app - .stats - .proxy_requests - .get_or_create(&ProxyRequestLabels { - protocol, - rpc_method, - user_id, - }) - .inc(); - } - }; - */ - - let response = app.proxy_web3_rpc(payload).instrument(user_span).await?; + let response = f.await.unwrap()?; Ok(Json(&response).into_response()) } pub async fn user_proxy_web3_rpc( - Json(payload): Json, Extension(app): Extension>, + Host(host): Host, + Json(payload): Json, + user_agent: Option>, Path(user_key): Path, + referer: Option>, ) -> FrontendResult { - let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?; + let request_span = debug_span!("request", host, ?referer, ?user_agent); - let protocol = Protocol::HTTP; + let user_id: u64 = rate_limit_by_user_key(&app, user_key) + .instrument(request_span.clone()) + .await?; - let user_span = error_span!("user", user_id, ?protocol); + let user_span = error_span!("user", user_id); - let response = app.proxy_web3_rpc(payload).instrument(user_span).await?; + let f = tokio::spawn(async move { + app.proxy_web3_rpc(payload) + .instrument(request_span) + .instrument(user_span) + .await + }); + + let response = f.await.unwrap()?; Ok(Json(&response).into_response()) } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 33419616..3ba72252 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -24,7 +24,6 @@ use uuid::Uuid; use crate::{ app::Web3ProxyApp, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, - stats::Protocol, }; #[debug_handler] @@ -36,9 +35,8 @@ pub async fn public_websocket_handler( let _ip = rate_limit_by_ip(&app, ip).await?; let user_id = 0; - let protocol = Protocol::Websocket; - let user_span = error_span!("user", user_id, ?protocol); + let user_span = error_span!("user", user_id); match ws_upgrade { Some(ws) => Ok(ws @@ -59,11 +57,9 @@ pub async fn user_websocket_handler( ) -> FrontendResult { let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?; - let protocol = Protocol::Websocket; - // log the id, not the address. we don't want to expose the user's address // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses - let user_span = error_span!("user", user_id, ?protocol); + let user_span = error_span!("user", user_id); match ws_upgrade { Some(ws_upgrade) => Ok(ws_upgrade @@ -118,7 +114,6 @@ async fn handle_socket_payload( let span = error_span!("eth_subscribe"); let response = app - .clone() .eth_subscribe(payload, subscription_count, response_sender.clone()) .instrument(span) .await; diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index dbcbee5c..a2da1712 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -3,6 +3,6 @@ pub mod block_number; pub mod config; pub mod frontend; pub mod jsonrpc; +pub mod metrics; pub mod rpcs; -pub mod stats; pub mod users; diff --git a/web3_proxy/src/metrics.rs b/web3_proxy/src/metrics.rs new file mode 100644 index 00000000..ca2c0dab --- /dev/null +++ b/web3_proxy/src/metrics.rs @@ -0,0 +1,58 @@ +use axum::headers::HeaderName; +use axum::http::HeaderValue; +use axum::response::{IntoResponse, Response}; +use axum::{routing::get, Extension, Router}; +use std::net::SocketAddr; +use std::sync::Arc; +use tracing::{info, instrument}; + +use crate::app::Web3ProxyApp; + +/// Run a prometheus metrics server on the given port. +#[instrument(skip_all)] +pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { + // build our application with a route + // order most to least common + // TODO: 404 any unhandled routes? + let app = Router::new().route("/", get(root)).layer(Extension(app)); + + // run our app with hyper + // TODO: allow only listening on localhost? + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + info!("prometheus listening on port {}", port); + // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? + + /* + It sequentially looks for an IP in: + - x-forwarded-for header (de-facto standard) + - x-real-ip header + - forwarded header (new standard) + - axum::extract::ConnectInfo (if not behind proxy) + + So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt + */ + let service = app.into_make_service_with_connect_info::(); + // let service = app.into_make_service(); + + // `axum::Server` is a re-export of `hyper::Server` + axum::Server::bind(&addr) + // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not + .serve(service) + .await + .map_err(Into::into) +} + +#[instrument(skip_all)] +async fn root(Extension(app): Extension>) -> Response { + let serialized = app.prometheus_metrics().unwrap(); + + let mut r = serialized.into_response(); + + // // TODO: is there an easier way to do this? + r.headers_mut().insert( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/openmetrics-text; version=1.0.0; charset=utf-8"), + ); + + r +} diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 23826670..28c1cfb6 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -14,7 +14,7 @@ use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -401,7 +401,7 @@ impl Web3Connections { } else { // TODO: this is too verbose. move to trace // i think "conns" is somehow getting dupes - debug!(?heavy_rpcs); + trace!(?heavy_rpcs); // success! this block has enough soft limit and nodes on it (or on later blocks) let conns: Vec> = heavy_rpcs diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 42c48766..b7dd7804 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,10 +1,10 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; +// use metered::{measure, ErrorCount, HitCount, InFlight, ResponseTime, Throughput}; use std::fmt; use std::sync::atomic; use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; -use tracing::debug; use tracing::warn; use tracing::{instrument, trace}; @@ -45,6 +45,7 @@ impl OpenRequestHandle { /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// By taking self here, we ensure that this is dropped after the request is complete + // #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] #[instrument(skip_all)] pub async fn request( &self, @@ -58,7 +59,7 @@ impl OpenRequestHandle { // TODO: use tracing spans properly // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose - debug!("Sending {} to {}", method, self.0); + trace!(rpc=%self.0, %method, "request"); let mut provider = None; @@ -81,8 +82,8 @@ impl OpenRequestHandle { // TODO: i think ethers already has trace logging (and does it much more fancy) // TODO: at least instrument this with more useful information - trace!("Reply from {} for {}: {:?}", self.0, method, response); - // trace!("Reply from {}", self.0); + // trace!(rpc=%self.0, %method, ?response); + trace!(rpc=%self.0, %method, "response"); response } diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs deleted file mode 100644 index e8de4943..00000000 --- a/web3_proxy/src/stats.rs +++ /dev/null @@ -1,115 +0,0 @@ -use axum::headers::HeaderName; -use axum::http::HeaderValue; -use axum::response::{IntoResponse, Response}; -use axum::{routing::get, Extension, Router}; -use prometheus_client::encoding::text::encode; -use prometheus_client::encoding::text::Encode; -use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::family::Family; -use prometheus_client::registry::Registry; -use std::net::SocketAddr; -use std::sync::Arc; -use tracing::info; - -#[derive(Clone, Hash, PartialEq, Eq, Encode)] -pub struct ProxyRequestLabels { - /// GET is websocket, POST is http - pub protocol: Protocol, - /// jsonrpc 2.0 method - pub rpc_method: String, - /// anonymous is user 0 - pub user_id: u64, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, Encode)] -pub enum Protocol { - HTTP, - Websocket, -} - -pub struct AppStatsRegistry { - pub registry: Registry, - pub stats: AppStats, -} - -#[derive(Clone)] -pub struct AppStats { - pub proxy_requests: Family, -} - -impl AppStatsRegistry { - pub fn new() -> Arc { - // Note the angle brackets to make sure to use the default (dynamic - // dispatched boxed metric) for the generic type parameter. - let mut registry = ::default(); - - // stats for GET and POST - let proxy_requests = Family::::default(); - registry.register( - // With the metric name. - "http_requests", - // And the metric help text. - "Number of HTTP requests received", - Box::new(proxy_requests.clone()), - ); - - let new = Self { - registry, - stats: AppStats { proxy_requests }, - }; - - Arc::new(new) - } - - pub async fn serve(self: Arc, port: u16) -> anyhow::Result<()> { - // build our application with a route - // order most to least common - // TODO: 404 any unhandled routes? - let app = Router::new() - .route("/", get(root)) - .layer(Extension(self.clone())); - - // run our app with hyper - // TODO: allow only listening on localhost? - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - info!("prometheus listening on port {}", port); - // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? - - /* - It sequentially looks for an IP in: - - x-forwarded-for header (de-facto standard) - - x-real-ip header - - forwarded header (new standard) - - axum::extract::ConnectInfo (if not behind proxy) - - So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt - */ - let service = app.into_make_service_with_connect_info::(); - // let service = app.into_make_service(); - - // `axum::Server` is a re-export of `hyper::Server` - axum::Server::bind(&addr) - // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not - .serve(service) - .await - .map_err(Into::into) - } -} - -async fn root(Extension(stats_registry): Extension>) -> Response { - let mut buffer = vec![]; - - encode(&mut buffer, &stats_registry.registry).unwrap(); - - let s = String::from_utf8(buffer).unwrap(); - - let mut r = s.into_response(); - - // // TODO: is there an easier way to do this? - r.headers_mut().insert( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/openmetrics-text; version=1.0.0; charset=utf-8"), - ); - - r -}