basic status page

This commit is contained in:
Bryan Stitt 2022-05-20 22:16:15 +00:00
parent ac4c60d6c9
commit 307062d8d2
4 changed files with 81 additions and 7 deletions

@ -34,6 +34,8 @@ type CacheKey = (H256, String, Option<String>);
type ResponseLruCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
@ -45,7 +47,7 @@ pub struct Web3ProxyApp {
balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>,
active_requests: DashMap<CacheKey, watch::Receiver<bool>>,
active_requests: ActiveRequestsMap,
response_cache: ResponseLruCache,
}
@ -104,6 +106,18 @@ impl Web3ProxyApp {
})
}
pub fn get_balanced_rpcs(&self) -> &Web3Connections {
&self.balanced_rpcs
}
pub fn get_private_rpcs(&self) -> &Web3Connections {
&self.private_rpcs
}
pub fn get_active_requests(&self) -> &ActiveRequestsMap {
&self.active_requests
}
/// send the request to the approriate RPCs
/// TODO: dry this up
#[instrument(skip_all)]

@ -7,6 +7,8 @@ use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32};
@ -78,8 +80,30 @@ pub struct Web3Connection {
soft_limit: u32,
/// the same clock that is used by the rate limiter
clock: QuantaClock,
// TODO: track total number of requests?
}
impl Serialize for Web3Connection {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 1)?;
// TODO: sanitize any credentials in the url
state.serialize_field("url", &self.url)?;
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Acquire),
)?;
state.end()
}
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection")

@ -8,6 +8,8 @@ use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil;
use hashbrown::HashMap;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::value::RawValue;
use std::cmp;
use std::collections::{BTreeMap, BTreeSet};
@ -20,7 +22,8 @@ use tracing::{info, info_span, instrument, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
#[derive(Clone, Default)]
// Serialize so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
@ -47,6 +50,21 @@ pub struct Web3Connections {
synced_connections: ArcSwap<SyncedConnections>,
}
impl Serialize for Web3Connections {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let inner: Vec<&Web3Connection> = self.inner.iter().map(|x| x.as_ref()).collect();
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connections", 2)?;
state.serialize_field("rpcs", &inner)?;
state.serialize_field("synced_connections", &**self.synced_connections.load())?;
state.end()
}
}
impl fmt::Debug for Web3Connections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though

@ -9,6 +9,7 @@ use axum::{
Json,
Router,
};
use serde_json::json;
use serde_json::value::RawValue;
use std::net::SocketAddr;
use std::sync::Arc;
@ -52,17 +53,32 @@ async fn root() -> impl IntoResponse {
// TODO: i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work
async fn proxy_web3_rpc(
payload: Json<JsonRpcRequestEnum>,
state: Extension<Arc<Web3ProxyApp>>,
app: Extension<Arc<Web3ProxyApp>>,
) -> impl IntoResponse {
match state.0.proxy_web3_rpc(payload.0).await {
match app.0.proxy_web3_rpc(payload.0).await {
Ok(response) => (StatusCode::OK, serde_json::to_string(&response).unwrap()),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)),
}
}
/// Status page
async fn status(state: Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
(StatusCode::INTERNAL_SERVER_ERROR, "Hello, list_rpcs!")
/// Very basic status page
async fn status(app: Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
let app = app.0.as_ref();
let balanced_rpcs = app.get_balanced_rpcs();
let private_rpcs = app.get_private_rpcs();
let num_active_requests = app.get_active_requests().len();
// TODO: what else should we include? uptime? prometheus?
let body = json!({
"balanced_rpcs": balanced_rpcs,
"private_rpcs": private_rpcs,
"num_active_requests": num_active_requests,
});
(StatusCode::INTERNAL_SERVER_ERROR, body.to_string())
}
async fn handler_404() -> impl IntoResponse {
@ -93,3 +109,5 @@ async fn _handle_anyhow_error(err: anyhow::Error) -> impl IntoResponse {
serde_json::to_string(&err).unwrap(),
)
}
// i think we want a custom result type. it has an anyhow result inside. it impl IntoResponse