From 307062d8d245926e80ac52454986629d86725d8e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 20 May 2022 22:16:15 +0000 Subject: [PATCH] basic status page --- web3-proxy/src/app.rs | 16 +++++++++++++++- web3-proxy/src/connection.rs | 24 ++++++++++++++++++++++++ web3-proxy/src/connections.rs | 20 +++++++++++++++++++- web3-proxy/src/frontend.rs | 28 +++++++++++++++++++++++----- 4 files changed, 81 insertions(+), 7 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 20627bc5..9d497f66 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -34,6 +34,8 @@ type CacheKey = (H256, String, Option); type ResponseLruCache = RwLock>; +type ActiveRequestsMap = DashMap>; + /// The application // TODO: this debug impl is way too verbose. make something smaller // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs @@ -45,7 +47,7 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, - active_requests: DashMap>, + active_requests: ActiveRequestsMap, response_cache: ResponseLruCache, } @@ -104,6 +106,18 @@ impl Web3ProxyApp { }) } + pub fn get_balanced_rpcs(&self) -> &Web3Connections { + &self.balanced_rpcs + } + + pub fn get_private_rpcs(&self) -> &Web3Connections { + &self.private_rpcs + } + + pub fn get_active_requests(&self) -> &ActiveRequestsMap { + &self.active_requests + } + /// send the request to the approriate RPCs /// TODO: dry this up #[instrument(skip_all)] diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index a7b922f4..4f7b6b30 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -7,6 +7,8 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; +use serde::ser::{SerializeStruct, Serializer}; +use serde::Serialize; use std::fmt; use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32}; @@ -78,8 +80,30 @@ pub struct Web3Connection { soft_limit: u32, /// the same clock that is used by the rate limiter clock: QuantaClock, + // TODO: track total number of requests? } +impl Serialize for Web3Connection { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // 3 is the number of fields in the struct. + let mut state = serializer.serialize_struct("Web3Connection", 1)?; + + // TODO: sanitize any credentials in the url + state.serialize_field("url", &self.url)?; + + state.serialize_field("soft_limit", &self.soft_limit)?; + + state.serialize_field( + "active_requests", + &self.active_requests.load(atomic::Ordering::Acquire), + )?; + + state.end() + } +} impl fmt::Debug for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Web3Connection") diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 244e6cd3..05c93dff 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -8,6 +8,8 @@ use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; use hashbrown::HashMap; +use serde::ser::{SerializeStruct, Serializer}; +use serde::Serialize; use serde_json::value::RawValue; use std::cmp; use std::collections::{BTreeMap, BTreeSet}; @@ -20,7 +22,8 @@ use tracing::{info, info_span, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; -#[derive(Clone, Default)] +// Serialize so we can print it on our debug endpoint +#[derive(Clone, Default, Serialize)] struct SyncedConnections { head_block_num: u64, head_block_hash: H256, @@ -47,6 +50,21 @@ pub struct Web3Connections { synced_connections: ArcSwap, } +impl Serialize for Web3Connections { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let inner: Vec<&Web3Connection> = self.inner.iter().map(|x| x.as_ref()).collect(); + + // 3 is the number of fields in the struct. + let mut state = serializer.serialize_struct("Web3Connections", 2)?; + state.serialize_field("rpcs", &inner)?; + state.serialize_field("synced_connections", &**self.synced_connections.load())?; + state.end() + } +} + impl fmt::Debug for Web3Connections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though diff --git a/web3-proxy/src/frontend.rs b/web3-proxy/src/frontend.rs index 40ef4e0c..1a58d29b 100644 --- a/web3-proxy/src/frontend.rs +++ b/web3-proxy/src/frontend.rs @@ -9,6 +9,7 @@ use axum::{ Json, Router, }; +use serde_json::json; use serde_json::value::RawValue; use std::net::SocketAddr; use std::sync::Arc; @@ -52,17 +53,32 @@ async fn root() -> impl IntoResponse { // TODO: i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work async fn proxy_web3_rpc( payload: Json, - state: Extension>, + app: Extension>, ) -> impl IntoResponse { - match state.0.proxy_web3_rpc(payload.0).await { + match app.0.proxy_web3_rpc(payload.0).await { Ok(response) => (StatusCode::OK, serde_json::to_string(&response).unwrap()), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)), } } -/// Status page -async fn status(state: Extension>) -> impl IntoResponse { - (StatusCode::INTERNAL_SERVER_ERROR, "Hello, list_rpcs!") +/// Very basic status page +async fn status(app: Extension>) -> impl IntoResponse { + let app = app.0.as_ref(); + + let balanced_rpcs = app.get_balanced_rpcs(); + + let private_rpcs = app.get_private_rpcs(); + + let num_active_requests = app.get_active_requests().len(); + + // TODO: what else should we include? uptime? prometheus? + let body = json!({ + "balanced_rpcs": balanced_rpcs, + "private_rpcs": private_rpcs, + "num_active_requests": num_active_requests, + }); + + (StatusCode::INTERNAL_SERVER_ERROR, body.to_string()) } async fn handler_404() -> impl IntoResponse { @@ -93,3 +109,5 @@ async fn _handle_anyhow_error(err: anyhow::Error) -> impl IntoResponse { serde_json::to_string(&err).unwrap(), ) } + +// i think we want a custom result type. it has an anyhow result inside. it impl IntoResponse