From 9213e1a796bdefc8ddddd103fe2f307c58893e74 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 17 May 2022 00:56:56 +0000 Subject: [PATCH] instrument --- TODO.md | 2 +- web3-proxy/src/app.rs | 13 +++++++++++-- web3-proxy/src/config.rs | 3 +++ web3-proxy/src/connection.rs | 23 +++++++++++++++++++---- web3-proxy/src/connections.rs | 6 +++++- web3-proxy/src/jsonrpc.rs | 2 +- web3-proxy/src/main.rs | 4 +++- 7 files changed, 43 insertions(+), 10 deletions(-) diff --git a/TODO.md b/TODO.md index c82d3989..619ccbf1 100644 --- a/TODO.md +++ b/TODO.md @@ -9,9 +9,9 @@ - [ ] endpoint for health checks. if no synced servers, give a 502 error - [ ] move from warp to auxm? - [ ] some production configs are occassionally stuck waiting at 100% cpu - - looks like its getting stuck on `futex(0x7fc15067b478, FUTEX_WAIT_PRIVATE, 1, NULL` - they stop processing new blocks. i'm guessing 2 blocks arrive at the same time, but i thought our locks would handle that - even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens. + - running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding - [ ] proper logging with useful instrumentation - [ ] handle websocket disconnect and reconnect - [ ] warning if no blocks for too long. maybe reconnect automatically? diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 34e149b0..b50ef5a7 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; use tokio::time::sleep; -use tracing::{trace, warn}; +use tracing::{debug, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -56,6 +56,7 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { + #[instrument(skip_all)] pub async fn try_new( chain_id: usize, balanced_rpcs: Vec, @@ -102,11 +103,12 @@ impl Web3ProxyApp { /// send the request to the approriate RPCs /// TODO: dry this up + #[instrument(skip_all)] pub async fn proxy_web3_rpc( self: Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { - trace!("Received request: {:?}", request); + debug!("Received request: {:?}", request); let response = match request { JsonRpcRequestEnum::Single(request) => { @@ -117,9 +119,12 @@ impl Web3ProxyApp { } }; + debug!("Forwarding response: {:?}", response); + Ok(warp::reply::json(&response)) } + #[instrument(skip_all)] async fn proxy_web3_rpc_requests( self: Arc, requests: Vec, @@ -149,6 +154,7 @@ impl Web3ProxyApp { Ok(collected) } + #[instrument(skip_all)] async fn proxy_web3_rpc_request( self: Arc, request: JsonRpcRequest, @@ -234,6 +240,9 @@ impl Web3ProxyApp { if let Some(cached) = self.response_cache.read().get(&cache_key) { let _ = self.active_requests.remove(&cache_key); + // TODO: emit a stat + trace!("cache hit!"); + return Ok(cached.to_owned()); } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index f92d339a..a88058d3 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -3,6 +3,7 @@ use governor::clock::QuantaClock; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; +use tracing::instrument; use crate::connection::Web3Connection; use crate::Web3ProxyApp; @@ -42,6 +43,7 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config + #[instrument(skip_all)] pub async fn try_build(self) -> anyhow::Result { let balanced_rpcs = self.balanced_rpcs.into_values().collect(); @@ -57,6 +59,7 @@ impl RpcConfig { impl Web3ConnectionConfig { /// Create a Web3Connection from config + #[instrument(skip_all)] pub async fn try_build( self, clock: &QuantaClock, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 90809b81..3553c42a 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -7,13 +7,14 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; +use parking_lot::RwLock; use std::fmt; use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32}; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; -use tokio::time::{interval, sleep, MissedTickBehavior}; -use tracing::{info, trace, warn}; +use tokio::time::{interval, sleep, timeout_at, Instant, MissedTickBehavior}; +use tracing::{info, instrument, trace, warn}; type Web3RateLimiter = RateLimiter>; @@ -38,6 +39,7 @@ pub struct Web3Connection { url: String, /// keep track of currently open requests. We sort on this active_requests: AtomicU32, + // TODO: put this in a RwLock so that we can replace it if re-connecting provider: Web3Provider, ratelimiter: Option, /// used for load balancing to the least loaded server @@ -61,7 +63,11 @@ impl fmt::Display for Web3Connection { } impl Web3Connection { + #[instrument(skip_all)] + async fn reconnect(&self) {} + /// Connect to a web3 rpc and subscribe to new heads + #[instrument(skip_all)] pub async fn try_new( chain_id: usize, url_str: String, @@ -164,6 +170,7 @@ impl Web3Connection { &self.url } + #[instrument(skip_all)] async fn send_block( self: &Arc, block: Result, ProviderError>, @@ -187,7 +194,7 @@ impl Web3Connection { } /// Subscribe to new blocks - // #[instrument] + #[instrument(skip_all)] pub async fn subscribe_new_heads( self: Arc, block_sender: flume::Sender<(u64, H256, Arc)>, @@ -260,9 +267,15 @@ impl Web3Connection { self.send_block(block, &block_sender).await; - while let Some(new_block) = stream.next().await { + // TODO: what should this timeout be? needs to be larger than worst case block time + // TODO: although reconnects will make this less of an issue + while let Ok(Some(new_block)) = + timeout_at(Instant::now() + Duration::from_secs(300), stream.next()).await + { self.send_block(Ok(new_block), &block_sender).await; } + + // TODO: re-connect! } } @@ -271,6 +284,7 @@ impl Web3Connection { Ok(()) } + #[instrument(skip_all)] pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { // TODO: maximum wait time @@ -331,6 +345,7 @@ impl ActiveRequestHandle { /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// By taking self here, we ensure that this is dropped after the request is complete + #[instrument(skip_all)] pub async fn request( &self, method: &str, diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 24f633a4..eb3e3d2a 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -11,7 +11,7 @@ use serde_json::value::RawValue; use std::cmp; use std::fmt; use std::sync::Arc; -use tracing::{info, trace, warn}; +use tracing::{info, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; @@ -140,6 +140,7 @@ impl fmt::Debug for Web3Connections { } impl Web3Connections { + #[instrument(skip_all)] pub async fn try_new( chain_id: usize, servers: Vec, @@ -217,6 +218,7 @@ impl Web3Connections { } /// Send the same request to all the handles. Returning the fastest successful result. + #[instrument(skip_all)] pub async fn try_send_parallel_requests( self: Arc, active_request_handles: Vec, @@ -280,6 +282,7 @@ impl Web3Connections { } /// TODO: possible dead lock here. investigate more. probably refactor + #[instrument(skip_all)] async fn update_synced_rpcs( &self, block_receiver: flume::Receiver<(u64, H256, Arc)>, @@ -305,6 +308,7 @@ impl Web3Connections { } /// get the best available rpc server + #[instrument(skip_all)] pub async fn next_upstream_server( &self, ) -> Result>> { diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 752aa8fb..43956964 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -161,7 +161,7 @@ impl fmt::Debug for JsonRpcForwardedResponse { } /// JSONRPC Responses can include one or many response objects. -#[derive(Clone, Serialize)] +#[derive(Clone, Debug, Serialize)] #[serde(untagged)] pub enum JsonRpcForwardedResponseEnum { Single(JsonRpcForwardedResponse), diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 7add1723..79f693c3 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use tokio::runtime; -use tracing::info; +use tracing::{info, warn}; use warp::Filter; use warp::Reply; @@ -96,6 +96,8 @@ fn handle_anyhow_errors( match res { Ok(r) => r.into_response(), Err(e) => { + warn!("Responding with an error: {:?}", e); + let e = JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), // TODO: what id can we use? how do we make sure the incoming id gets attached to this?