From 997d4fa5bbad5f0f478088bb8afed7cd3fcf5d91 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 17 May 2022 17:15:18 +0000 Subject: [PATCH] less instrumenting --- web3-proxy/src/app.rs | 65 ++++++++++++++++++++++++++--------- web3-proxy/src/config.rs | 6 ++-- web3-proxy/src/connections.rs | 2 +- web3-proxy/src/main.rs | 2 +- 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 06b47c89..614f5361 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -17,6 +17,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio::task; use tokio::time::sleep; +use tracing::info_span; use tracing::{debug, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( @@ -57,7 +58,7 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { - #[instrument(name = "try_new_Web3ProxyApp", skip_all)] + // #[instrument(name = "try_new_Web3ProxyApp", skip_all)] pub async fn try_new( chain_id: usize, balanced_rpcs: Vec, @@ -126,7 +127,7 @@ impl Web3ProxyApp { Ok(warp::reply::json(&response)) } - #[instrument(skip_all)] + // #[instrument(skip_all)] async fn proxy_web3_rpc_requests( self: Arc, requests: Vec, @@ -158,7 +159,7 @@ impl Web3ProxyApp { Ok(collected) } - #[instrument(skip_all)] + // #[instrument(skip_all)] async fn proxy_web3_rpc_request( self: Arc, request: JsonRpcRequest, @@ -228,10 +229,16 @@ impl Web3ProxyApp { } else { // this is not a private transaction (or no private relays are configured) // TODO: how much should we retry? - for _ in 0..10 { + for i in 0..10 { // TODO: think more about this loop. + // TODO: set tracing span to have the loop count in it + let span = info_span!("i", i); + let _enter = span.enter(); // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches + // TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed + trace!("{:?} waiting for best_block_hash", request); + let best_block_hash = self .balanced_rpcs .get_synced_rpcs() @@ -239,6 +246,8 @@ impl Web3ProxyApp { .map(|x| *x.get_head_block_hash()) .unwrap(); + trace!("{:?} best_block_hash {}", request, best_block_hash); + // TODO: building this cache key is slow and its large, but i don't see a better way right now // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block let cache_key = ( @@ -252,9 +261,11 @@ impl Web3ProxyApp { let _ = self.active_requests.remove(&cache_key); // TODO: emit a stat - trace!("cache hit!"); + trace!("{:?} cache hit!", request); return Ok(cached.to_owned()); + } else { + trace!("{:?} cache miss!", request); } // check if this request is already in flight @@ -271,6 +282,8 @@ impl Web3ProxyApp { if let Some(mut other_in_flight_rx) = other_in_flight_rx { // wait for the other request to finish. it can finish successfully or with an error + trace!("{:?} waiting on in-flight request", request); + let _ = other_in_flight_rx.changed().await; // now that we've waited, lets check the cache again @@ -278,7 +291,19 @@ impl Web3ProxyApp { let _ = self.active_requests.remove(&cache_key); let _ = in_flight_tx.send(false); + // TODO: emit a stat + trace!( + "{:?} cache hit after waiting for in-flight request!", + request + ); + return Ok(cached.to_owned()); + } else { + // TODO: emit a stat + trace!( + "{:?} cache miss after waiting for in-flight request!", + request + ); } } @@ -291,7 +316,7 @@ impl Web3ProxyApp { let response = match response { Ok(partial_response) => { // TODO: trace here was really slow with millions of requests. - // info!("forwarding request from {}", upstream_server); + // trace!("forwarding request from {}", upstream_server); let response = JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), @@ -383,15 +408,18 @@ impl Web3ProxyApp { } }; - if response.error.is_some() { - trace!("Sending error reply: {:?}", response); - } else { - trace!("Sending reply: {:?}", response); - } - // TODO: needing to remove manually here makes me think we should do this differently let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + + if response.error.is_some() { + trace!("Sending error reply: {:?}", response); + + // errors already sent false to the in_flight_tx + } else { + trace!("Sending reply: {:?}", response); + + let _ = in_flight_tx.send(false); + } return Ok(response); } @@ -416,12 +444,15 @@ impl Web3ProxyApp { sleep(deadline).await; warn!("All rate limits exceeded. Sleeping"); + + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); + + // continue + continue; } } - - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); } } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 24b9787a..9b7a1a49 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -3,7 +3,7 @@ use governor::clock::QuantaClock; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; -use tracing::instrument; +// use tracing::instrument; use crate::connection::Web3Connection; use crate::Web3ProxyApp; @@ -43,7 +43,7 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config - #[instrument(name = "try_build_RpcConfig", skip_all)] + // #[instrument(name = "try_build_RpcConfig", skip_all)] pub async fn try_build(self) -> anyhow::Result { let balanced_rpcs = self.balanced_rpcs.into_values().collect(); @@ -59,7 +59,7 @@ impl RpcConfig { impl Web3ConnectionConfig { /// Create a Web3Connection from config - #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] + // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] pub async fn try_build( self, clock: &QuantaClock, diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index fbe202dc..0a714ca9 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -146,7 +146,7 @@ impl fmt::Debug for Web3Connections { } impl Web3Connections { - #[instrument(name = "try_new_Web3Connections", skip_all)] + // #[instrument(name = "try_new_Web3Connections", skip_all)] pub async fn try_new( chain_id: usize, servers: Vec, diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index dd6db68c..9ffc1978 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -22,7 +22,7 @@ use crate::config::{CliConfig, RpcConfig}; fn main() -> anyhow::Result<()> { // install global collector configured based on RUST_LOG env var. - // tracing_subscriber::fmt::init(); + // TODO: if RUST_LOG isn't set, set it to "web3_proxy=debug" or something console_subscriber::init(); let cli_config: CliConfig = argh::from_env();