more counts to the status page
This commit is contained in:
parent
c9b2c0c0d2
commit
2c41cad452
@ -35,7 +35,7 @@ use std::str::FromStr;
|
|||||||
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
|
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::{broadcast, watch, Notify};
|
use tokio::sync::{broadcast, watch};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tokio::time::{timeout, Instant};
|
use tokio::time::{timeout, Instant};
|
||||||
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
|
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
|
||||||
@ -52,13 +52,8 @@ static APP_USER_AGENT: &str = concat!(
|
|||||||
|
|
||||||
/// block hash, method, params
|
/// block hash, method, params
|
||||||
// TODO: better name
|
// TODO: better name
|
||||||
type Web3QueryCacheKey = (H256, String, Option<String>);
|
type ResponseCacheKey = (H256, String, Option<String>);
|
||||||
|
type ResponseCache = Cache<ResponseCacheKey, JsonRpcForwardedResponse>;
|
||||||
/// wait on this to
|
|
||||||
type ResponseCacheReady = Arc<Notify>;
|
|
||||||
|
|
||||||
type RequestCache = Cache<Web3QueryCacheKey, (u64, ResponseCacheReady)>;
|
|
||||||
type ResponseCache = Cache<Web3QueryCacheKey, JsonRpcForwardedResponse>;
|
|
||||||
|
|
||||||
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
|
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
|
||||||
|
|
||||||
@ -86,7 +81,6 @@ pub struct Web3ProxyApp {
|
|||||||
pub db_conn: Option<sea_orm::DatabaseConnection>,
|
pub db_conn: Option<sea_orm::DatabaseConnection>,
|
||||||
/// store pending queries so that we don't send the same request to our backends multiple times
|
/// store pending queries so that we don't send the same request to our backends multiple times
|
||||||
pub total_queries: AtomicU64,
|
pub total_queries: AtomicU64,
|
||||||
pub active_queries: RequestCache,
|
|
||||||
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
|
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
|
||||||
pub pending_transactions: Cache<TxHash, TxStatus>,
|
pub pending_transactions: Cache<TxHash, TxStatus>,
|
||||||
pub frontend_rate_limiter: Option<RedisRateLimit>,
|
pub frontend_rate_limiter: Option<RedisRateLimit>,
|
||||||
@ -312,7 +306,6 @@ impl Web3ProxyApp {
|
|||||||
|
|
||||||
// TODO: change this to a sized cache
|
// TODO: change this to a sized cache
|
||||||
let total_queries = 0.into();
|
let total_queries = 0.into();
|
||||||
let active_queries = Cache::new(10_000);
|
|
||||||
let response_cache = Cache::new(10_000);
|
let response_cache = Cache::new(10_000);
|
||||||
let user_cache = Cache::new(10_000);
|
let user_cache = Cache::new(10_000);
|
||||||
|
|
||||||
@ -324,7 +317,6 @@ impl Web3ProxyApp {
|
|||||||
head_block_receiver,
|
head_block_receiver,
|
||||||
pending_tx_sender,
|
pending_tx_sender,
|
||||||
total_queries,
|
total_queries,
|
||||||
active_queries,
|
|
||||||
pending_transactions,
|
pending_transactions,
|
||||||
frontend_rate_limiter,
|
frontend_rate_limiter,
|
||||||
db_conn,
|
db_conn,
|
||||||
@ -612,6 +604,8 @@ impl Web3ProxyApp {
|
|||||||
let span = info_span!("rpc_request");
|
let span = info_span!("rpc_request");
|
||||||
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
|
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
|
||||||
|
|
||||||
|
self.total_queries.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
// TODO: don't clone
|
// TODO: don't clone
|
||||||
let partial_response: serde_json::Value = match request.method.clone().as_ref() {
|
let partial_response: serde_json::Value = match request.method.clone().as_ref() {
|
||||||
// lots of commands are blocked
|
// lots of commands are blocked
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use crate::app::Web3ProxyApp;
|
use crate::app::Web3ProxyApp;
|
||||||
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
||||||
|
use moka::future::ConcurrentCacheExt;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::sync::{atomic, Arc};
|
use std::sync::{atomic, Arc};
|
||||||
|
|
||||||
@ -16,10 +17,11 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
|
|||||||
/// TODO: replace this with proper stats and monitoring
|
/// TODO: replace this with proper stats and monitoring
|
||||||
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
|
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
|
||||||
// TODO: what else should we include? uptime?
|
// TODO: what else should we include? uptime?
|
||||||
|
app.pending_transactions.sync();
|
||||||
|
app.user_cache.sync();
|
||||||
|
|
||||||
let body = json!({
|
let body = json!({
|
||||||
"total_queries": app.total_queries.load(atomic::Ordering::Relaxed),
|
"total_queries": app.total_queries.load(atomic::Ordering::Acquire),
|
||||||
"active_queries_count": app.active_queries.entry_count(),
|
|
||||||
"active_queries_size": app.active_queries.weighted_size(),
|
|
||||||
"pending_transactions_count": app.pending_transactions.entry_count(),
|
"pending_transactions_count": app.pending_transactions.entry_count(),
|
||||||
"pending_transactions_size": app.pending_transactions.weighted_size(),
|
"pending_transactions_size": app.pending_transactions.weighted_size(),
|
||||||
"user_cache_count": app.user_cache.entry_count(),
|
"user_cache_count": app.user_cache.entry_count(),
|
||||||
|
Loading…
Reference in New Issue
Block a user