From 2c41cad4526eee5cb10be0b5a67795ab42666ade Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Sep 2022 04:11:47 +0000 Subject: [PATCH] more counts to the status page --- web3_proxy/src/app.rs | 16 +++++----------- web3_proxy/src/frontend/http.rs | 8 +++++--- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index d98a4546..3070a7ea 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -35,7 +35,7 @@ use std::str::FromStr; use std::sync::atomic::{self, AtomicU64, AtomicUsize}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, watch, Notify}; +use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; @@ -52,13 +52,8 @@ static APP_USER_AGENT: &str = concat!( /// block hash, method, params // TODO: better name -type Web3QueryCacheKey = (H256, String, Option); - -/// wait on this to -type ResponseCacheReady = Arc; - -type RequestCache = Cache; -type ResponseCache = Cache; +type ResponseCacheKey = (H256, String, Option); +type ResponseCache = Cache; pub type AnyhowJoinHandle = JoinHandle>; @@ -86,7 +81,6 @@ pub struct Web3ProxyApp { pub db_conn: Option, /// store pending queries so that we don't send the same request to our backends multiple times pub total_queries: AtomicU64, - pub active_queries: RequestCache, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_rate_limiter: Option, @@ -312,7 +306,6 @@ impl Web3ProxyApp { // TODO: change this to a sized cache let total_queries = 0.into(); - let active_queries = Cache::new(10_000); let response_cache = Cache::new(10_000); let user_cache = Cache::new(10_000); @@ -324,7 +317,6 @@ impl Web3ProxyApp { head_block_receiver, pending_tx_sender, total_queries, - active_queries, pending_transactions, frontend_rate_limiter, db_conn, @@ -612,6 +604,8 @@ impl Web3ProxyApp { let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) + self.total_queries.fetch_add(1, atomic::Ordering::Relaxed); + // TODO: don't clone let partial_response: serde_json::Value = match request.method.clone().as_ref() { // lots of commands are blocked diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 7efca991..1a909ac3 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -1,5 +1,6 @@ use crate::app::Web3ProxyApp; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; +use moka::future::ConcurrentCacheExt; use serde_json::json; use std::sync::{atomic, Arc}; @@ -16,10 +17,11 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// TODO: replace this with proper stats and monitoring pub async fn status(Extension(app): Extension>) -> impl IntoResponse { // TODO: what else should we include? uptime? + app.pending_transactions.sync(); + app.user_cache.sync(); + let body = json!({ - "total_queries": app.total_queries.load(atomic::Ordering::Relaxed), - "active_queries_count": app.active_queries.entry_count(), - "active_queries_size": app.active_queries.weighted_size(), + "total_queries": app.total_queries.load(atomic::Ordering::Acquire), "pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_size": app.pending_transactions.weighted_size(), "user_cache_count": app.user_cache.entry_count(),