From d8e1bd44dc3cc53a6d8b7894d9280ee4a31c35ba Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 11 Oct 2023 12:34:56 -0700 Subject: [PATCH] async serialize --- web3_proxy/src/frontend/status.rs | 6 +- web3_proxy/src/rpcs/many.rs | 94 ++++++++++++------------------- 2 files changed, 39 insertions(+), 61 deletions(-) diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 2bea60b2..31b773b6 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -193,8 +193,8 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ - "balanced_rpcs": app.balanced_rpcs, - "bundler_4337_rpcs": app.bundler_4337_rpcs, + "balanced_rpcs": app.balanced_rpcs.as_json().await, + "bundler_4337_rpcs": app.bundler_4337_rpcs.as_json().await, "caches": [ MokaCacheSerializer(&app.ip_semaphores), MokaCacheSerializer(&app.jsonrpc_response_cache), @@ -208,7 +208,7 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "hostname": app.hostname, "payment_factory_address": app.config.deposit_factory_contract, "pending_txid_firehose": app.pending_txid_firehose, - "private_rpcs": app.protected_rpcs, + "private_rpcs": app.protected_rpcs.as_json().await, "uptime": app.start.elapsed().as_secs(), "version": APP_USER_AGENT, }); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 606d5dd7..f7956464 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -18,9 +18,7 @@ use futures_util::future::join_all; use hashbrown::HashMap; use http::StatusCode; use moka::future::CacheBuilder; -use serde::ser::{SerializeStruct, Serializer}; -use serde::Serialize; -use serde_json::json; +use serde_json::{json, Value}; use std::borrow::Cow; use std::fmt::{self, Display}; use std::sync::Arc; @@ -624,6 +622,38 @@ impl Web3Rpcs { ProxyMode::Versus => todo!("Versus"), } } + + /// TODO: this should be called `async_serialize` and take a Serializer, but this is easier + pub async fn as_json(&self) -> Value { + // TODO: coordinate with frontend team to rename "conns" to "rpcs" + let rpcs: Vec<_> = { + let by_name = self.by_name.read().await; + by_name.values().cloned().collect() + }; + + let ranked_rpcs = self.watch_ranked_rpcs.borrow().as_ref().cloned(); + + let caches = ( + MokaCacheSerializer(&self.blocks_by_hash), + MokaCacheSerializer(&self.blocks_by_number), + ); + + let watch_consensus_rpcs_receivers = self.watch_ranked_rpcs.receiver_count(); + + let watch_consensus_head_receivers = if let Some(ref x) = self.watch_head_block { + Some(x.receiver_count()) + } else { + None + }; + + json!({ + "conns": rpcs, + "synced_connections": ranked_rpcs, + "caches": caches, + "watch_consensus_rpcs_receivers": watch_consensus_rpcs_receivers, + "watch_consensus_head_receivers": watch_consensus_head_receivers, + }) + } } impl Display for Web3Rpcs { @@ -637,71 +667,19 @@ impl fmt::Debug for Web3Rpcs { // TODO: the default formatter takes forever to write. this is too quiet though let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some(); - // todo!(get names) - // let names = self.by_name.blocking_read(); - // let names = names.values().map(|x| x.name.as_str()).collect::>(); - let names = (); + // let rpcs = self.by_name.blocking_read(); + // let rpcs = rpcs.values().map(|x| x.name.as_str()).collect::>(); let head_block = self.head_block(); f.debug_struct("Web3Rpcs") - .field("rpcs", &names) + // .field("rpcs", &rpcs) .field("consensus_rpcs", &consensus_rpcs) .field("head_block", &head_block) .finish_non_exhaustive() } } -impl Serialize for Web3Rpcs { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; - - { - // todo!(get rpcs) - // let by_name = self.by_name.read().await; - // let rpcs: Vec<&Arc> = by_name.values().collect(); - // TODO: coordinate with frontend team to rename "conns" to "rpcs" - let rpcs = (); - state.serialize_field("conns", &rpcs)?; - } - - { - let consensus_rpcs = self.watch_ranked_rpcs.borrow().clone(); - // TODO: rename synced_connections to consensus_rpcs - - if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { - state.serialize_field("synced_connections", consensus_rpcs)?; - } else { - state.serialize_field("synced_connections", &None::<()>)?; - } - } - - state.serialize_field( - "caches", - &( - MokaCacheSerializer(&self.blocks_by_hash), - MokaCacheSerializer(&self.blocks_by_number), - ), - )?; - - state.serialize_field( - "watch_consensus_rpcs_receivers", - &self.watch_ranked_rpcs.receiver_count(), - )?; - - if let Some(ref x) = self.watch_head_block { - state.serialize_field("watch_consensus_head_receivers", &x.receiver_count())?; - } else { - state.serialize_field("watch_consensus_head_receivers", &None::<()>)?; - } - - state.end() - } -} - mod tests { #![allow(unused_imports)]