From 269aa32260a4c3e94d7ada55fee6c8e62d975c87 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Jul 2023 18:29:48 -0700 Subject: [PATCH] First pass implementation of async serialize for Web3Rps (#192) Co-authored-by: Rory Neithinger --- web3_proxy/src/app/mod.rs | 2 +- web3_proxy/src/frontend/status.rs | 45 +++++++++++++++++++++++++--- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/many.rs | 49 +++++++++++++++++-------------- 4 files changed, 70 insertions(+), 28 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index c3eadbbb..03c058a0 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1029,7 +1029,7 @@ impl Web3ProxyApp { request_metadata: &Arc, ) -> Web3ProxyResult> { if let Some(protected_rpcs) = self.private_rpcs.as_ref() { - if !protected_rpcs.is_empty() { + if !protected_rpcs.is_empty().await { let protected_response = protected_rpcs .try_send_all_synced_connections( method, diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 6ef5d8c5..f6b65509 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -21,7 +21,7 @@ use http::HeaderMap; use moka::future::Cache; use once_cell::sync::Lazy; use serde::{ser::SerializeStruct, Serialize}; -use serde_json::json; +use serde_json::{json, value::RawValue}; use std::{sync::Arc, time::Duration}; use tokio::time::timeout; use tracing::trace; @@ -190,11 +190,48 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { // TODO: get out of app.balanced_rpcs instead? let head_block = app.watch_consensus_head_receiver.borrow().clone(); + let balanced_rpcs = { + let mut buf = Vec::::new(); + let ser = serde_json::Serializer::new(&mut buf); + app.balanced_rpcs + .serialize_async(ser) + .await + .expect("TODO: can this fail?"); + let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?")); + RawValue::from_string(strbuf).expect("TODO: can this fail?") + }; + + let bundler_4337_rpcs = match &app.bundler_4337_rpcs { + Some(rpcs) => { + let mut buf = Vec::::new(); + let ser = serde_json::Serializer::new(&mut buf); + rpcs.serialize_async(ser) + .await + .expect("TODO: can this fail?"); + let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?")); + Some(RawValue::from_string(strbuf).expect("TODO: can this fail?")) + } + None => None, + }; + + let private_rpcs = match &app.private_rpcs { + Some(rpcs) => { + let mut buf = Vec::::new(); + let ser = serde_json::Serializer::new(&mut buf); + rpcs.serialize_async(ser) + .await + .expect("TODO: can this fail?"); + let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?")); + Some(RawValue::from_string(strbuf).expect("TODO: can this fail?")) + } + None => None, + }; + // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ - "balanced_rpcs": app.balanced_rpcs, - "bundler_4337_rpcs": app.bundler_4337_rpcs, + "balanced_rpcs": balanced_rpcs, + "bundler_4337_rpcs": bundler_4337_rpcs, "caches": [ MokaCacheSerializer(&app.ip_semaphores), MokaCacheSerializer(&app.jsonrpc_response_cache), @@ -207,7 +244,7 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "head_block_hash": head_block.as_ref().map(|x| x.hash()), "hostname": app.hostname, "payment_factory_address": app.config.deposit_factory_contract, - "private_rpcs": app.private_rpcs, + "private_rpcs": private_rpcs, "version": APP_USER_AGENT, }); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 76bf16ed..a50c64b7 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -442,7 +442,7 @@ impl ConsensusFinder { let consensus_head_block = new_ranked_rpcs.head_block.clone(); let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs(); let num_active_rpcs = self.len(); - let total_rpcs = web3_rpcs.len(); + let total_rpcs = web3_rpcs.len().await; let new_ranked_rpcs = Arc::new(new_ranked_rpcs); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ce6b0bf7..ef0408f1 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -19,7 +19,6 @@ use futures::StreamExt; use hashbrown::HashMap; use itertools::Itertools; use moka::future::CacheBuilder; -use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -29,7 +28,7 @@ use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, RwLock}; use tokio::time::{sleep, sleep_until, Duration, Instant}; use tracing::{debug, error, info, trace, warn}; @@ -217,7 +216,7 @@ impl Web3Rpcs { Ok(Ok((new_rpc, _handle))) => { // web3 connection worked - let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone); + let old_rpc = self.by_name.read().await.get(&new_rpc.name).map(Arc::clone); // clean up the old rpc if it exists if let Some(old_rpc) = old_rpc { @@ -239,7 +238,10 @@ impl Web3Rpcs { // new rpc is synced (or old one was not synced). update the local map // make sure that any new requests use the new connection - self.by_name.write().insert(new_rpc.name.clone(), new_rpc); + self.by_name + .write() + .await + .insert(new_rpc.name.clone(), new_rpc); // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { @@ -247,7 +249,10 @@ impl Web3Rpcs { disconnect_sender.send_replace(true); } } else { - self.by_name.write().insert(new_rpc.name.clone(), new_rpc); + self.by_name + .write() + .await + .insert(new_rpc.name.clone(), new_rpc); } } Ok(Err(err)) => { @@ -264,12 +269,12 @@ impl Web3Rpcs { } // TODO: remove any RPCs that were part of the config, but are now removed - let active_names: Vec<_> = self.by_name.read().keys().cloned().collect(); + let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect(); for name in active_names { if names_to_keep.contains(&name) { continue; } - if let Some(old_rpc) = self.by_name.write().remove(&name) { + if let Some(old_rpc) = self.by_name.write().await.remove(&name) { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { debug!("telling {} to disconnect. no longer needed", old_rpc); disconnect_sender.send_replace(true); @@ -277,7 +282,7 @@ impl Web3Rpcs { } } - let num_rpcs = self.len(); + let num_rpcs = self.len().await; if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { @@ -289,16 +294,16 @@ impl Web3Rpcs { Ok(()) } - pub fn get(&self, conn_name: &str) -> Option> { - self.by_name.read().get(conn_name).cloned() + pub async fn get(&self, conn_name: &str) -> Option> { + self.by_name.read().await.get(conn_name).cloned() } - pub fn len(&self) -> usize { - self.by_name.read().len() + pub async fn len(&self) -> usize { + self.by_name.read().await.len() } - pub fn is_empty(&self) -> bool { - self.by_name.read().is_empty() + pub async fn is_empty(&self) -> bool { + self.by_name.read().await.is_empty() } /// TODO: rename to be consistent between "head" and "synced" @@ -645,7 +650,7 @@ impl Web3Rpcs { let mut earliest_retry_at = None; // TODO: filter the rpcs with Ranked.will_work_now - let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); + let mut all_rpcs: Vec<_> = self.by_name.read().await.values().cloned().collect(); let mut max_count = if let Some(max_count) = max_count { max_count @@ -1055,7 +1060,7 @@ impl Web3Rpcs { return Err(err.into()); } - let num_conns = self.len(); + let num_conns = self.len().await; let num_skipped = skip_rpcs.len(); let needed = min_block_needed.max(max_block_needed); @@ -1297,15 +1302,15 @@ impl fmt::Debug for Web3Rpcs { } } -impl Serialize for Web3Rpcs { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { +impl Web3Rpcs { + pub async fn serialize_async( + &self, + mut serializer: serde_json::Serializer, + ) -> Result<(), serde_json::Error> { let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; { - let by_name = self.by_name.read(); + let by_name = self.by_name.read().await; let rpcs: Vec<&Arc> = by_name.values().collect(); // TODO: coordinate with frontend team to rename "conns" to "rpcs" state.serialize_field("conns", &rpcs)?;