From 05563b46b1d8e2bfe73057694f841a5f4c2970eb Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 18 May 2022 16:35:06 +0000 Subject: [PATCH] less spawns and more logs --- config/example.toml | 33 +++------ web3-proxy/examples/subscribe_blocks.rs | 10 ++- web3-proxy/examples/watch_blocks.rs | 8 +- web3-proxy/src/app.rs | 36 ++++----- web3-proxy/src/config.rs | 4 + web3-proxy/src/connection.rs | 8 +- web3-proxy/src/connections.rs | 99 +++++++++++++++---------- web3-proxy/src/main.rs | 41 ++++++---- 8 files changed, 131 insertions(+), 108 deletions(-) diff --git a/config/example.toml b/config/example.toml index 7acc7902..2bcd3479 100644 --- a/config/example.toml +++ b/config/example.toml @@ -4,36 +4,21 @@ chain_id = 1 [balanced_rpcs] [balanced_rpcs.erigon_archive] + url = "http://127.0.0.1:8549" + # TODO: double check soft_limit on erigon + soft_limit = 100_000 + + [balanced_rpcs.erigon_archive_ws] url = "ws://127.0.0.1:8549" # TODO: double check soft_limit on erigon soft_limit = 100_000 [balanced_rpcs.geth] + url = "http://127.0.0.1:8545" + soft_limit = 200_000 + + [balanced_rpcs.geth_ws] url = "ws://127.0.0.1:8546" soft_limit = 200_000 - [balanced_rpcs.ankr] - url = "https://rpc.ankr.com/eth" - soft_limit = 3_000 - [private_rpcs] - - [private_rpcs.eden] - url = "https://api.edennetwork.io/v1/" - soft_limit = 1_805 - - [private_rpcs.eden_beta] - url = "https://api.edennetwork.io/v1/beta" - soft_limit = 5_861 - - [private_rpcs.ethermine] - url = "https://rpc.ethermine.org" - soft_limit = 5_861 - - [private_rpcs.flashbots] - url = "https://rpc.flashbots.net" - soft_limit = 7074 - - [private_rpcs.securerpc] - url = "https://gibson.securerpc.com/v1" - soft_limit = 4560 diff --git a/web3-proxy/examples/subscribe_blocks.rs b/web3-proxy/examples/subscribe_blocks.rs index b4602212..01498c26 100644 --- a/web3-proxy/examples/subscribe_blocks.rs +++ b/web3-proxy/examples/subscribe_blocks.rs @@ -4,10 +4,14 @@ use std::time::Duration; #[tokio::main] async fn main() -> anyhow::Result<()> { + console_subscriber::init(); + + fdlimit::raise_fd_limit(); + // erigon - let url = "ws://10.11.12.16:8545"; + // let url = "ws://10.11.12.16:8548"; // geth - // let url = "ws://10.11.12.16:8946"; + let url = "ws://10.11.12.16:8546"; println!("Subscribing to blocks from {}", url); @@ -15,7 +19,7 @@ async fn main() -> anyhow::Result<()> { let provider = Provider::new(provider).interval(Duration::from_secs(1)); - let mut stream = provider.subscribe_blocks().await?.take(3); + let mut stream = provider.subscribe_blocks().await?; while let Some(block) = stream.next().await { println!( "{:?} = Ts: {:?}, block number: {}", diff --git a/web3-proxy/examples/watch_blocks.rs b/web3-proxy/examples/watch_blocks.rs index 7fa3ffe2..7488b020 100644 --- a/web3-proxy/examples/watch_blocks.rs +++ b/web3-proxy/examples/watch_blocks.rs @@ -4,10 +4,14 @@ use std::{str::FromStr, time::Duration}; #[tokio::main] async fn main() -> anyhow::Result<()> { + console_subscriber::init(); + + fdlimit::raise_fd_limit(); + // erigon does not support most filters // let url = "http://10.11.12.16:8545"; // geth - let url = "http://10.11.12.16:8945"; + let url = "http://10.11.12.16:8545"; println!("Watching blocks from {:?}", url); @@ -15,7 +19,7 @@ async fn main() -> anyhow::Result<()> { let provider = Provider::new(provider).interval(Duration::from_secs(1)); - let mut stream = provider.watch_blocks().await?.take(3); + let mut stream = provider.watch_blocks().await?; while let Some(block_number) = stream.next().await { let block = provider.get_block(block_number).await?.unwrap(); println!( diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 614f5361..7149e874 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -71,27 +71,28 @@ impl Web3ProxyApp { // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server let http_client = reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(300)) + .timeout(Duration::from_secs(60)) .user_agent(APP_USER_AGENT) .build()?; // TODO: attach context to this error - let balanced_rpcs = Web3Connections::try_new( - chain_id, - balanced_rpcs, - Some(http_client.clone()), - &clock, - true, - ) - .await?; + let balanced_rpcs = + Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone()), &clock) + .await?; + + { + let balanced_rpcs = balanced_rpcs.clone(); + task::spawn(async move { + balanced_rpcs.subscribe_heads().await; + }); + } // TODO: attach context to this error let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); balanced_rpcs.clone() } else { - Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock, false) - .await? + Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock).await? }; Ok(Web3ProxyApp { @@ -140,12 +141,7 @@ impl Web3ProxyApp { let responses = join_all( requests .into_iter() - .map(|request| { - let clone = self.clone(); - task::Builder::default() - .name("proxy_web3_rpc_request") - .spawn(async move { clone.proxy_web3_rpc_request(request).await }) - }) + .map(|request| self.clone().proxy_web3_rpc_request(request)) .collect::>(), ) .await; @@ -153,7 +149,7 @@ impl Web3ProxyApp { // TODO: i'm sure this could be done better with iterators let mut collected: Vec = Vec::with_capacity(num_requests); for response in responses { - collected.push(response??); + collected.push(response?); } Ok(collected) @@ -166,8 +162,6 @@ impl Web3ProxyApp { ) -> anyhow::Result { trace!("Received request: {:?}", request); - // TODO: apparently json_body can be a vec of multiple requests. should we split them up? we need to respond with a Vec too - if request.method == "eth_sendRawTransaction" { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit @@ -231,7 +225,7 @@ impl Web3ProxyApp { // TODO: how much should we retry? for i in 0..10 { // TODO: think more about this loop. - // TODO: set tracing span to have the loop count in it + // TODO: add more to this span let span = info_span!("i", i); let _enter = span.enter(); diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 9b7a1a49..1b2d9bd5 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -15,6 +15,10 @@ pub struct CliConfig { #[argh(option, default = "8544")] pub listen_port: u16, + /// number of worker threads + #[argh(option, default = "0")] + pub worker_threads: usize, + /// path to a toml of rpc servers #[argh(option, default = "\"./config/example.toml\".to_string()")] pub rpc_config_path: String, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 05cd1b86..54f2130b 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -415,9 +415,10 @@ impl ActiveRequestHandle { T: fmt::Debug + serde::Serialize + Send + Sync, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, { - // TODO: this should probably be trace level and use a span + // TODO: use tracing spans properly // TODO: it would be nice to have the request id on this - trace!("Sending {}({:?}) to {}", method, params, self.0); + // TODO: including params in this is way too verbose + trace!("Sending {} to {}", method, self.0); let provider = self.0.provider.read().await.clone(); @@ -428,7 +429,8 @@ impl ActiveRequestHandle { // TODO: i think ethers already has trace logging (and does it much more fancy) // TODO: at least instrument this with more useful information - trace!("Response from {}: {:?}", self.0, response); + // trace!("Reply from {}: {:?}", self.0, response); + trace!("Reply from {}", self.0); response } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 0a714ca9..023d713a 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,6 +1,7 @@ ///! Load balanced communication with a group of web3 providers use derive_more::From; use ethers::prelude::H256; +use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; @@ -11,8 +12,10 @@ use serde_json::value::RawValue; use std::cmp; use std::fmt; use std::sync::Arc; +use tokio::sync::Mutex; use tokio::task; -use tracing::{info, instrument, trace, warn}; +use tracing::Instrument; +use tracing::{info, info_span, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; @@ -88,6 +91,8 @@ impl SyncedConnections { // TODO: better log if log { trace!("Now synced: {:?}", self.inner); + } else { + trace!("Now synced #2: {:?}", self.inner); } } } @@ -134,6 +139,7 @@ impl Absorb for SyncedConnections { pub struct Web3Connections { inner: Vec>, synced_connections_reader: ReadHandleFactory, + synced_connections_writer: Mutex>, } impl fmt::Debug for Web3Connections { @@ -152,7 +158,6 @@ impl Web3Connections { servers: Vec, http_client: Option, clock: &QuantaClock, - subscribe_heads: bool, ) -> anyhow::Result> { let num_connections = servers.len(); @@ -175,60 +180,62 @@ impl Web3Connections { )); } - let (block_sender, block_receiver) = flume::unbounded(); - let (mut synced_connections_writer, synced_connections_reader) = left_right::new::(); - if subscribe_heads { - for connection in connections.iter() { - // subscribe to new heads in a spawned future - // TODO: channel instead. then we can have one future with write access to a left-right? - let connection = Arc::clone(connection); - let block_sender = block_sender.clone(); - task::Builder::default() - .name("subscribe_new_heads") - .spawn(async move { - let url = connection.url().to_string(); - - // loop to automatically reconnect - // TODO: make this cancellable? - loop { - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection - .clone() - .subscribe_new_heads(block_sender.clone(), true) - .await - { - warn!("new_heads error on {}: {:?}", url, e); - } - } - }); - } - } - synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsCapacity( num_connections, )); + trace!("publishing synced connections"); synced_connections_writer.publish(); + trace!("published synced connections"); let connections = Arc::new(Self { inner: connections, synced_connections_reader: synced_connections_reader.factory(), + synced_connections_writer: Mutex::new(synced_connections_writer), }); - if subscribe_heads { - let connections = Arc::clone(&connections); - task::Builder::default() - .name("update_synced_rpcs") + Ok(connections) + } + + pub async fn subscribe_heads(self: &Arc) { + let (block_sender, block_receiver) = flume::unbounded(); + + let mut handles = vec![]; + + for connection in self.inner.iter() { + // subscribe to new heads in a spawned future + // TODO: channel instead. then we can have one future with write access to a left-right? + let connection = Arc::clone(connection); + let block_sender = block_sender.clone(); + + // let url = connection.url().to_string(); + + let handle = task::Builder::default() + .name("subscribe_new_heads") .spawn(async move { - connections - .update_synced_rpcs(block_receiver, synced_connections_writer) + // loop to automatically reconnect + // TODO: make this cancellable? + // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date + // TODO: proper spann + connection + .subscribe_new_heads(block_sender.clone(), true) + .instrument(tracing::info_span!("url")) .await }); + + handles.push(handle); } - Ok(connections) + let connections = Arc::clone(self); + let handle = task::Builder::default() + .name("update_synced_rpcs") + .spawn(async move { connections.update_synced_rpcs(block_receiver).await }); + + handles.push(handle); + + join_all(handles).await; } pub fn get_synced_rpcs(&self) -> left_right::ReadHandle { @@ -306,14 +313,18 @@ impl Web3Connections { async fn update_synced_rpcs( &self, block_receiver: flume::Receiver<(u64, H256, Arc)>, - mut synced_connections_writer: WriteHandle, ) -> anyhow::Result<()> { + let mut synced_connections_writer = self.synced_connections_writer.lock().await; + while let Ok((new_block_num, new_block_hash, rpc)) = block_receiver.recv_async().await { if new_block_num == 0 { warn!("{} is still syncing", rpc); continue; } + let span = info_span!("new_block_num", new_block_num,); + let _enter = span.enter(); + synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsUpdate( new_block_num, new_block_hash, @@ -321,9 +332,19 @@ impl Web3Connections { )); // TODO: only publish when the second block arrives? + // TODO: use spans properly + trace!("publishing synced connections for block {}", new_block_num,); synced_connections_writer.publish(); + trace!( + "published synced connections for block {} from {}", + new_block_num, + "some rpc" + ); } + // TODO: if there was an error, we should return it + warn!("block_receiver exited!"); + Ok(()) } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index a5a5d4ca..929a566c 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -7,6 +7,7 @@ mod jsonrpc; use jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse}; use parking_lot::deadlock; use serde_json::value::RawValue; +use std::env; use std::fs; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; @@ -21,8 +22,12 @@ use crate::app::Web3ProxyApp; use crate::config::{CliConfig, RpcConfig}; fn main() -> anyhow::Result<()> { + // TODO: is there a better way to do this? + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "web3_proxy=debug"); + } + // install global collector configured based on RUST_LOG env var. - // TODO: if RUST_LOG isn't set, set it to "web3_proxy=debug" or something console_subscriber::init(); fdlimit::raise_fd_limit(); @@ -38,18 +43,22 @@ fn main() -> anyhow::Result<()> { let chain_id = rpc_config.shared.chain_id; - // TODO: get worker_threads from config - let rt = runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(8) - .thread_name_fn(move || { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need - let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); - // TODO: i think these max at 15 characters - format!("web3-{}-{}", chain_id, worker_id) - }) - .build()?; + // TODO: multithreaded runtime once i'm done debugging + let mut rt_builder = runtime::Builder::new_current_thread(); + + rt_builder.enable_all().thread_name_fn(move || { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need + let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); + // TODO: i think these max at 15 characters + format!("web3-{}-{}", chain_id, worker_id) + }); + + if cli_config.worker_threads > 0 { + rt_builder.worker_threads(cli_config.worker_threads); + } + + let rt = rt_builder.build()?; // spawn a thread for deadlock detection thread::spawn(move || loop { @@ -73,7 +82,7 @@ fn main() -> anyhow::Result<()> { rt.block_on(async { let listen_port = cli_config.listen_port; - let app = rpc_config.try_build().await.unwrap(); + let app = rpc_config.try_build().await?; let app: Arc = Arc::new(app); @@ -89,9 +98,9 @@ fn main() -> anyhow::Result<()> { let routes = proxy_rpc_filter.map(handle_anyhow_errors); warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; - }); - Ok(()) + Ok(()) + }) } /// convert result into a jsonrpc error. use this at the end of your warp filter