From 315b751dc7b307b88125cb4190ded5b06544aa27 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 17 May 2022 16:23:27 +0000 Subject: [PATCH] add tokio-console --- Cargo.lock | 325 ++++++++++++++++++++++++++++++++++ web3-proxy/Cargo.toml | 3 +- web3-proxy/src/app.rs | 22 ++- web3-proxy/src/connection.rs | 4 + web3-proxy/src/connections.rs | 65 ++++--- web3-proxy/src/main.rs | 5 +- 6 files changed, 387 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f934ce0..b796a8e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,27 @@ dependencies = [ "term", ] +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.53" @@ -169,6 +190,49 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa 1.0.1", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "backtrace" version = "0.3.65" @@ -607,6 +671,43 @@ dependencies = [ "winapi", ] +[[package]] +name = "console-api" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24cb05777feccbb2642d4f2df44d0505601a2cd88ca517d8c913f263a5a8dc8b" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f21a16ee925aa9d2bad2e296beffd6c5b1bfaad50af509d305b8e7f23af20fb" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures", + "hdrhistogram", + "humantime", + "parking_lot 0.11.2", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.7.1" @@ -634,6 +735,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.3.5" @@ -1305,6 +1415,18 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" +[[package]] +name = "flate2" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.10.12" @@ -1598,6 +1720,19 @@ dependencies = [ "hashbrown 0.12.1", ] +[[package]] +name = "hdrhistogram" +version = "7.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0" +dependencies = [ + "base64 0.13.0", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.7" @@ -1703,6 +1838,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.7.1" @@ -1715,6 +1856,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.18" @@ -1752,6 +1899,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "idna" version = "0.2.3" @@ -2007,12 +2166,27 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "md-5" version = "0.10.1" @@ -2053,6 +2227,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.5.1" @@ -2124,6 +2304,16 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" +[[package]] +name = "nom" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nonzero_ext" version = "0.3.0" @@ -2598,6 +2788,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "prost" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" +dependencies = [ + "bytes", + "prost", +] + [[package]] name = "quanta" version = "0.9.3" @@ -2740,6 +2963,9 @@ name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] [[package]] name = "regex-syntax" @@ -3311,6 +3537,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "tap" version = "1.0.1" @@ -3467,9 +3699,20 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -3569,6 +3812,83 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be9d60db39854b30b835107500cf0aca0b0d14d6e1c3de124217c23a29c2ddb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.0", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util 0.7.1", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util 0.7.1", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.1" @@ -3637,10 +3957,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" dependencies = [ "ansi_term", + "lazy_static", + "matchers", "parking_lot 0.12.0", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -3966,6 +4290,7 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "console-subscriber", "dashmap", "derive_more", "ethers", diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index d93b302b..5f436ed8 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" anyhow = "1.0.57" argh = "0.1.7" # axum = "*" # TODO: use this instead of warp? +console-subscriber = { version = "0.1.5", features = ["parking_lot"] } dashmap = "5.3.3" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } @@ -28,7 +29,7 @@ reqwest = { version = "0.11.10", default-features = false, features = ["json", " rustc-hash = "1.1.0" serde = { version = "1.0.137", features = [] } serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] } -tokio = { version = "1.18.2", features = ["full"] } +tokio = { version = "1.18.2", features = ["full", "tracing"] } toml = "0.5.9" tracing = "0.1.34" # TODO: tracing-subscriber has serde and serde_json features that we might want to use diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index e83b5e7f..06b47c89 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -15,6 +15,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; +use tokio::task; use tokio::time::sleep; use tracing::{debug, instrument, trace, warn}; @@ -140,7 +141,9 @@ impl Web3ProxyApp { .into_iter() .map(|request| { let clone = self.clone(); - tokio::spawn(async move { clone.proxy_web3_rpc_request(request).await }) + task::Builder::default() + .name("proxy_web3_rpc_request") + .spawn(async move { clone.proxy_web3_rpc_request(request).await }) }) .collect::>(), ) @@ -177,11 +180,18 @@ impl Web3ProxyApp { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? - tokio::spawn(async move { - connections - .try_send_parallel_requests(active_request_handles, method, params, tx) - .await - }); + task::Builder::default() + .name("try_send_parallel_requests") + .spawn(async move { + connections + .try_send_parallel_requests( + active_request_handles, + method, + params, + tx, + ) + .await + }); // wait for the first response // TODO: we don't want the first response. we want the quorum response diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 41f2d6c3..05cd1b86 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -12,6 +12,7 @@ use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::RwLock; +use tokio::task; use tokio::time::{interval, sleep, timeout_at, Duration, Instant, MissedTickBehavior}; use tracing::{info, instrument, trace, warn}; @@ -309,6 +310,9 @@ impl Web3Connection { { Ok(Some(new_block)) => { self.send_block(Ok(new_block), &block_sender).await; + + // TODO: really not sure about this + task::yield_now().await; } Ok(None) => { warn!("subscription ended"); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 7c512c28..fbe202dc 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -11,6 +11,7 @@ use serde_json::value::RawValue; use std::cmp; use std::fmt; use std::sync::Arc; +use tokio::task; use tracing::{info, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; @@ -185,22 +186,24 @@ impl Web3Connections { // TODO: channel instead. then we can have one future with write access to a left-right? let connection = Arc::clone(connection); let block_sender = block_sender.clone(); - tokio::spawn(async move { - let url = connection.url().to_string(); + task::Builder::default() + .name("subscribe_new_heads") + .spawn(async move { + let url = connection.url().to_string(); - // loop to automatically reconnect - // TODO: make this cancellable? - loop { - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection - .clone() - .subscribe_new_heads(block_sender.clone(), true) - .await - { - warn!("new_heads error on {}: {:?}", url, e); + // loop to automatically reconnect + // TODO: make this cancellable? + loop { + // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date + if let Err(e) = connection + .clone() + .subscribe_new_heads(block_sender.clone(), true) + .await + { + warn!("new_heads error on {}: {:?}", url, e); + } } - } - }); + }); } } @@ -216,11 +219,13 @@ impl Web3Connections { if subscribe_heads { let connections = Arc::clone(&connections); - tokio::spawn(async move { - connections - .update_synced_rpcs(block_receiver, synced_connections_writer) - .await - }); + task::Builder::default() + .name("update_synced_rpcs") + .spawn(async move { + connections + .update_synced_rpcs(block_receiver, synced_connections_writer) + .await + }); } Ok(connections) @@ -248,17 +253,19 @@ impl Web3Connections { let params = params.clone(); let response_sender = response_sender.clone(); - let handle = tokio::spawn(async move { - let response: Box = - active_request_handle.request(&method, ¶ms).await?; + let handle = task::Builder::default() + .name("send_request") + .spawn(async move { + let response: Box = + active_request_handle.request(&method, ¶ms).await?; - // send the first good response to a one shot channel. that way we respond quickly - // drop the result because errors are expected after the first send - response_sender - .send_async(Ok(response)) - .await - .map_err(Into::into) - }); + // send the first good response to a one shot channel. that way we respond quickly + // drop the result because errors are expected after the first send + response_sender + .send_async(Ok(response)) + .await + .map_err(Into::into) + }); unordered_futures.push(handle); } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index cee3b760..dd6db68c 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -22,7 +22,8 @@ use crate::config::{CliConfig, RpcConfig}; fn main() -> anyhow::Result<()> { // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt::init(); + // tracing_subscriber::fmt::init(); + console_subscriber::init(); let cli_config: CliConfig = argh::from_env(); @@ -35,8 +36,10 @@ fn main() -> anyhow::Result<()> { let chain_id = rpc_config.shared.chain_id; + // TODO: get worker_threads from config let rt = runtime::Builder::new_multi_thread() .enable_all() + .worker_threads(8) .thread_name_fn(move || { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need