add tokio-console

This commit is contained in:
Bryan Stitt 2022-05-17 16:23:27 +00:00
parent 58643d605c
commit 315b751dc7
6 changed files with 387 additions and 37 deletions

325
Cargo.lock generated

@ -118,6 +118,27 @@ dependencies = [
"term",
]
[[package]]
name = "async-stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.53"
@ -169,6 +190,49 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa 1.0.1",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"sync_wrapper",
"tokio",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
]
[[package]]
name = "backtrace"
version = "0.3.65"
@ -607,6 +671,43 @@ dependencies = [
"winapi",
]
[[package]]
name = "console-api"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24cb05777feccbb2642d4f2df44d0505601a2cd88ca517d8c913f263a5a8dc8b"
dependencies = [
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f21a16ee925aa9d2bad2e296beffd6c5b1bfaad50af509d305b8e7f23af20fb"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime",
"parking_lot 0.11.2",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.7.1"
@ -634,6 +735,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "criterion"
version = "0.3.5"
@ -1305,6 +1415,18 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
name = "flate2"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af"
dependencies = [
"cfg-if",
"crc32fast",
"libc",
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.12"
@ -1598,6 +1720,19 @@ dependencies = [
"hashbrown 0.12.1",
]
[[package]]
name = "hdrhistogram"
version = "7.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0"
dependencies = [
"base64 0.13.0",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "headers"
version = "0.3.7"
@ -1703,6 +1838,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]]
name = "httparse"
version = "1.7.1"
@ -1715,6 +1856,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.18"
@ -1752,6 +1899,18 @@ dependencies = [
"tokio-rustls",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "idna"
version = "0.2.3"
@ -2007,12 +2166,27 @@ dependencies = [
"libc",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "matchit"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "md-5"
version = "0.10.1"
@ -2053,6 +2227,12 @@ dependencies = [
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.5.1"
@ -2124,6 +2304,16 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
@ -2598,6 +2788,39 @@ dependencies = [
"winapi",
]
[[package]]
name = "prost"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
dependencies = [
"bytes",
"prost",
]
[[package]]
name = "quanta"
version = "0.9.3"
@ -2740,6 +2963,9 @@ name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
@ -3311,6 +3537,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "tap"
version = "1.0.1"
@ -3467,9 +3699,20 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"winapi",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "1.7.0"
@ -3569,6 +3812,83 @@ dependencies = [
"serde",
]
[[package]]
name = "tonic"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be9d60db39854b30b835107500cf0aca0b0d14d6e1c3de124217c23a29c2ddb"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.13.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tower"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project",
"pin-project-lite",
"rand",
"slab",
"tokio",
"tokio-util 0.7.1",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-service"
version = "0.3.1"
@ -3637,10 +3957,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [
"ansi_term",
"lazy_static",
"matchers",
"parking_lot 0.12.0",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
@ -3966,6 +4290,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"argh",
"console-subscriber",
"dashmap",
"derive_more",
"ethers",

@ -9,6 +9,7 @@ edition = "2021"
anyhow = "1.0.57"
argh = "0.1.7"
# axum = "*" # TODO: use this instead of warp?
console-subscriber = { version = "0.1.5", features = ["parking_lot"] }
dashmap = "5.3.3"
derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
@ -28,7 +29,7 @@ reqwest = { version = "0.11.10", default-features = false, features = ["json", "
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.18.2", features = ["full"] }
tokio = { version = "1.18.2", features = ["full", "tracing"] }
toml = "0.5.9"
tracing = "0.1.34"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use

@ -15,6 +15,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task;
use tokio::time::sleep;
use tracing::{debug, instrument, trace, warn};
@ -140,7 +141,9 @@ impl Web3ProxyApp {
.into_iter()
.map(|request| {
let clone = self.clone();
tokio::spawn(async move { clone.proxy_web3_rpc_request(request).await })
task::Builder::default()
.name("proxy_web3_rpc_request")
.spawn(async move { clone.proxy_web3_rpc_request(request).await })
})
.collect::<Vec<_>>(),
)
@ -177,11 +180,18 @@ impl Web3ProxyApp {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
tokio::spawn(async move {
connections
.try_send_parallel_requests(active_request_handles, method, params, tx)
.await
});
task::Builder::default()
.name("try_send_parallel_requests")
.spawn(async move {
connections
.try_send_parallel_requests(
active_request_handles,
method,
params,
tx,
)
.await
});
// wait for the first response
// TODO: we don't want the first response. we want the quorum response

@ -12,6 +12,7 @@ use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::RwLock;
use tokio::task;
use tokio::time::{interval, sleep, timeout_at, Duration, Instant, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
@ -309,6 +310,9 @@ impl Web3Connection {
{
Ok(Some(new_block)) => {
self.send_block(Ok(new_block), &block_sender).await;
// TODO: really not sure about this
task::yield_now().await;
}
Ok(None) => {
warn!("subscription ended");

@ -11,6 +11,7 @@ use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
use std::sync::Arc;
use tokio::task;
use tracing::{info, instrument, trace, warn};
use crate::config::Web3ConnectionConfig;
@ -185,22 +186,24 @@ impl Web3Connections {
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
let block_sender = block_sender.clone();
tokio::spawn(async move {
let url = connection.url().to_string();
task::Builder::default()
.name("subscribe_new_heads")
.spawn(async move {
let url = connection.url().to_string();
// loop to automatically reconnect
// TODO: make this cancellable?
loop {
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection
.clone()
.subscribe_new_heads(block_sender.clone(), true)
.await
{
warn!("new_heads error on {}: {:?}", url, e);
// loop to automatically reconnect
// TODO: make this cancellable?
loop {
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection
.clone()
.subscribe_new_heads(block_sender.clone(), true)
.await
{
warn!("new_heads error on {}: {:?}", url, e);
}
}
}
});
});
}
}
@ -216,11 +219,13 @@ impl Web3Connections {
if subscribe_heads {
let connections = Arc::clone(&connections);
tokio::spawn(async move {
connections
.update_synced_rpcs(block_receiver, synced_connections_writer)
.await
});
task::Builder::default()
.name("update_synced_rpcs")
.spawn(async move {
connections
.update_synced_rpcs(block_receiver, synced_connections_writer)
.await
});
}
Ok(connections)
@ -248,17 +253,19 @@ impl Web3Connections {
let params = params.clone();
let response_sender = response_sender.clone();
let handle = tokio::spawn(async move {
let response: Box<RawValue> =
active_request_handle.request(&method, &params).await?;
let handle = task::Builder::default()
.name("send_request")
.spawn(async move {
let response: Box<RawValue> =
active_request_handle.request(&method, &params).await?;
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
response_sender
.send_async(Ok(response))
.await
.map_err(Into::into)
});
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
response_sender
.send_async(Ok(response))
.await
.map_err(Into::into)
});
unordered_futures.push(handle);
}

@ -22,7 +22,8 @@ use crate::config::{CliConfig, RpcConfig};
fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
// tracing_subscriber::fmt::init();
console_subscriber::init();
let cli_config: CliConfig = argh::from_env();
@ -35,8 +36,10 @@ fn main() -> anyhow::Result<()> {
let chain_id = rpc_config.shared.chain_id;
// TODO: get worker_threads from config
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(8)
.thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need