From c8da98d12ef9758d21b346d0cc03b54a11819e34 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Sep 2022 03:53:16 +0000 Subject: [PATCH] even more metrics --- web3_proxy/src/app.rs | 38 ++++++++++++++++++-------- web3_proxy/src/config.rs | 3 ++ web3_proxy/src/frontend/http.rs | 6 ++-- web3_proxy/src/rpcs/connection.rs | 6 +++- web3_proxy/src/rpcs/connections.rs | 5 +++- web3_proxy/src/rpcs/request.rs | 44 ++++++++++++++++++------------ 6 files changed, 67 insertions(+), 35 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index eda8912c..d0239ee5 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -8,6 +8,7 @@ use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use crate::rpcs::connections::Web3Connections; +use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; use anyhow::Context; use axum::extract::ws::Message; @@ -29,11 +30,12 @@ use redis_rate_limit::{ RedisConnectionManager, RedisErrorSink, RedisPool, RedisRateLimit, }; use sea_orm::DatabaseConnection; +use serde::Serialize; use serde_json::json; use std::fmt; use std::pin::Pin; use std::str::FromStr; -use std::sync::atomic::{self, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, watch}; @@ -81,9 +83,8 @@ pub struct Web3ProxyApp { pub config: AppConfig, pub db_conn: Option, /// prometheus metrics - metrics: Arc, - /// store pending queries so that we don't send the same request to our backends multiple times - pub total_queries: AtomicU64, + app_metrics: Arc, + open_request_handle_metrics: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_rate_limiter: Option, @@ -144,7 +145,7 @@ pub async fn get_migrated_db( Ok(db) } -#[metered(registry = Web3ProxyAppMetrics)] +#[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)] impl Web3ProxyApp { pub async fn spawn( top_config: TopConfig, @@ -159,6 +160,9 @@ impl Web3ProxyApp { "redirect user url must contain \"{{user_id}}\"" ); + let app_metrics = Default::default(); + let open_request_handle_metrics: Arc = Default::default(); + // first, we connect to mysql and make sure the latest migrations have run let db_conn = if let Some(db_url) = &top_config.app.db_url { let db_min_connections = top_config.app.db_min_connections.unwrap_or(num_workers); @@ -262,6 +266,7 @@ impl Web3ProxyApp { top_config.app.min_synced_rpcs, Some(pending_tx_sender.clone()), pending_transactions.clone(), + open_request_handle_metrics.clone(), ) .await .context("balanced rpcs")?; @@ -288,6 +293,7 @@ impl Web3ProxyApp { // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits None, pending_transactions.clone(), + open_request_handle_metrics.clone(), ) .await .context("private_rpcs")?; @@ -308,7 +314,6 @@ impl Web3ProxyApp { }); // TODO: change this to a sized cache - let total_queries = 0.into(); let response_cache = Cache::new(10_000); let user_cache = Cache::new(10_000); @@ -319,12 +324,12 @@ impl Web3ProxyApp { response_cache, head_block_receiver, pending_tx_sender, - total_queries, pending_transactions, frontend_rate_limiter, db_conn, redis_pool, - metrics: Default::default(), + app_metrics, + open_request_handle_metrics, user_cache, }; @@ -340,7 +345,18 @@ impl Web3ProxyApp { // TODO: what globals? should this be the hostname or what? // globals.insert("service", "web3_proxy"); - let serialized = serde_prometheus::to_string(&self.metrics, Some("web3_proxy"), globals)?; + #[derive(Serialize)] + struct CombinedMetrics<'a> { + app: &'a Web3ProxyAppMetrics, + backend_rpc: &'a OpenRequestHandleMetrics, + } + + let metrics = CombinedMetrics { + app: &self.app_metrics, + backend_rpc: &self.open_request_handle_metrics, + }; + + let serialized = serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)?; Ok(serialized) } @@ -618,12 +634,10 @@ impl Web3ProxyApp { // TODO: how much should we retry? probably with a timeout and not with a count like this // TODO: think more about this loop. - // // TODO: add more to this span such as + // TODO: add things to this span let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - self.total_queries.fetch_add(1, atomic::Ordering::Relaxed); - // TODO: don't clone let partial_response: serde_json::Value = match request.method.clone().as_ref() { // lots of commands are blocked diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 03b23c7d..1f66755d 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,5 +1,6 @@ use crate::rpcs::blockchain::BlockHashesMap; use crate::rpcs::connection::Web3Connection; +use crate::rpcs::request::OpenRequestHandleMetrics; use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; use argh::FromArgs; use derive_more::Constructor; @@ -125,6 +126,7 @@ impl Web3ConnectionConfig { block_map: BlockHashesMap, block_sender: Option>, tx_id_sender: Option>, + open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = match (self.hard_limit, redis_pool) { (None, None) => None, @@ -156,6 +158,7 @@ impl Web3ConnectionConfig { tx_id_sender, true, self.weight, + open_request_handle_metrics, ) .await } diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 8ecb77fe..b2d305e5 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -1,8 +1,6 @@ use crate::app::Web3ProxyApp; -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; -use moka::future::ConcurrentCacheExt; -use serde_json::json; -use std::sync::{atomic, Arc}; +use axum::{http::StatusCode, response::IntoResponse, Extension}; +use std::sync::Arc; use tracing::instrument; /// Health check page for load balancers to use diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 9feba2c1..18644a5d 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,7 +1,7 @@ ///! Rate-limited communication with a web3 provider. use super::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use super::provider::Web3Provider; -use super::request::{OpenRequestHandle, OpenRequestResult}; +use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; @@ -30,6 +30,7 @@ pub struct Web3Connection { pub(super) active_requests: AtomicU32, /// keep track of total requests /// TODO: is this type okay? + /// TODO: replace this with something in metered? pub(super) total_requests: AtomicU64, /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits @@ -44,6 +45,7 @@ pub struct Web3Connection { pub(super) weight: u32, // TODO: async lock? pub(super) head_block_id: RwLock>, + pub(super) open_request_handle_metrics: Arc, } impl Web3Connection { @@ -67,6 +69,7 @@ impl Web3Connection { tx_id_sender: Option)>>, reconnect: bool, weight: u32, + open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow configurable period and max_burst @@ -92,6 +95,7 @@ impl Web3Connection { block_data_limit: Default::default(), head_block_id: RwLock::new(Default::default()), weight, + open_request_handle_metrics, }; let new_connection = Arc::new(new_connection); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 6f7d8a6d..de9b779a 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,7 +1,7 @@ ///! Load balanced communication with a group of web3 providers use super::blockchain::{ArcBlock, BlockHashesMap}; use super::connection::Web3Connection; -use super::request::{OpenRequestHandle, OpenRequestResult}; +use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; @@ -63,6 +63,7 @@ impl Web3Connections { min_synced_rpcs: usize, pending_tx_sender: Option>, pending_transactions: Cache, + open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -119,6 +120,7 @@ impl Web3Connections { let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); let block_map = block_map.clone(); + let open_request_handle_metrics = open_request_handle_metrics.clone(); tokio::spawn(async move { server_config @@ -131,6 +133,7 @@ impl Web3Connections { block_map, block_sender, pending_tx_id_sender, + open_request_handle_metrics, ) .await }) diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index b7dd7804..ab7b9e40 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,6 +1,11 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; -// use metered::{measure, ErrorCount, HitCount, InFlight, ResponseTime, Throughput}; +use metered::metered; +use metered::ErrorCount; +use metered::HitCount; +use metered::InFlight; +use metered::ResponseTime; +use metered::Throughput; use std::fmt; use std::sync::atomic; use std::sync::Arc; @@ -19,34 +24,39 @@ pub enum OpenRequestResult { /// Make RPC requests through this handle and drop it when you are done. #[derive(Debug)] -pub struct OpenRequestHandle(Arc); +pub struct OpenRequestHandle { + conn: Arc, + // TODO: this is the same metrics on the conn. use a reference + metrics: Arc, +} +#[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { - pub fn new(connection: Arc) -> Self { + pub fn new(conn: Arc) -> Self { // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! - connection - .active_requests - .fetch_add(1, atomic::Ordering::AcqRel); + // TODO: should we be using metered, or not? i think not because we want stats for each handle + // TODO: these should maybe be sent to an influxdb instance? + conn.active_requests.fetch_add(1, atomic::Ordering::AcqRel); // TODO: handle overflows? // TODO: what ordering? - connection - .total_requests - .fetch_add(1, atomic::Ordering::Relaxed); + conn.total_requests.fetch_add(1, atomic::Ordering::Relaxed); - Self(connection) + let metrics = conn.open_request_handle_metrics.clone(); + + Self { conn, metrics } } pub fn clone_connection(&self) -> Arc { - self.0.clone() + self.conn.clone() } /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// By taking self here, we ensure that this is dropped after the request is complete - // #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] #[instrument(skip_all)] + #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] pub async fn request( &self, method: &str, @@ -59,14 +69,14 @@ impl OpenRequestHandle { // TODO: use tracing spans properly // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose - trace!(rpc=%self.0, %method, "request"); + trace!(rpc=%self.conn, %method, "request"); let mut provider = None; while provider.is_none() { - match self.0.provider.read().await.as_ref() { + match self.conn.provider.read().await.as_ref() { None => { - warn!(rpc=%self.0, "no provider!"); + warn!(rpc=%self.conn, "no provider!"); // TODO: how should this work? a reconnect should be in progress. but maybe force one now? // TODO: sleep how long? subscribe to something instead? sleep(Duration::from_millis(100)).await @@ -83,7 +93,7 @@ impl OpenRequestHandle { // TODO: i think ethers already has trace logging (and does it much more fancy) // TODO: at least instrument this with more useful information // trace!(rpc=%self.0, %method, ?response); - trace!(rpc=%self.0, %method, "response"); + trace!(rpc=%self.conn, %method, "response"); response } @@ -91,7 +101,7 @@ impl OpenRequestHandle { impl Drop for OpenRequestHandle { fn drop(&mut self) { - self.0 + self.conn .active_requests .fetch_sub(1, atomic::Ordering::AcqRel); }