Revert "First pass implementation of async serialize for Web3Rps (#192)"

This reverts commit 269aa32260.
This commit is contained in:
Bryan Stitt 2023-07-19 23:09:39 -07:00
parent 0ff3d77391
commit e7905b4344
4 changed files with 29 additions and 70 deletions

View File

@ -1100,7 +1100,7 @@ impl Web3ProxyApp {
request_metadata: &Arc<RequestMetadata>, request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<Box<RawValue>> { ) -> Web3ProxyResult<Box<RawValue>> {
if let Some(protected_rpcs) = self.private_rpcs.as_ref() { if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
if !protected_rpcs.is_empty().await { if !protected_rpcs.is_empty() {
let protected_response = protected_rpcs let protected_response = protected_rpcs
.try_send_all_synced_connections( .try_send_all_synced_connections(
method, method,

View File

@ -21,7 +21,7 @@ use http::HeaderMap;
use moka::future::Cache; use moka::future::Cache;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::{ser::SerializeStruct, Serialize}; use serde::{ser::SerializeStruct, Serialize};
use serde_json::{json, value::RawValue}; use serde_json::json;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::time::timeout; use tokio::time::timeout;
use tracing::trace; use tracing::trace;
@ -190,48 +190,11 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
// TODO: get out of app.balanced_rpcs instead? // TODO: get out of app.balanced_rpcs instead?
let head_block = app.watch_consensus_head_receiver.borrow().clone(); let head_block = app.watch_consensus_head_receiver.borrow().clone();
let balanced_rpcs = {
let mut buf = Vec::<u8>::new();
let ser = serde_json::Serializer::new(&mut buf);
app.balanced_rpcs
.serialize_async(ser)
.await
.expect("TODO: can this fail?");
let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?"));
RawValue::from_string(strbuf).expect("TODO: can this fail?")
};
let bundler_4337_rpcs = match &app.bundler_4337_rpcs {
Some(rpcs) => {
let mut buf = Vec::<u8>::new();
let ser = serde_json::Serializer::new(&mut buf);
rpcs.serialize_async(ser)
.await
.expect("TODO: can this fail?");
let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?"));
Some(RawValue::from_string(strbuf).expect("TODO: can this fail?"))
}
None => None,
};
let private_rpcs = match &app.private_rpcs {
Some(rpcs) => {
let mut buf = Vec::<u8>::new();
let ser = serde_json::Serializer::new(&mut buf);
rpcs.serialize_async(ser)
.await
.expect("TODO: can this fail?");
let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?"));
Some(RawValue::from_string(strbuf).expect("TODO: can this fail?"))
}
None => None,
};
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start? // TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({ let body = json!({
"balanced_rpcs": balanced_rpcs, "balanced_rpcs": app.balanced_rpcs,
"bundler_4337_rpcs": bundler_4337_rpcs, "bundler_4337_rpcs": app.bundler_4337_rpcs,
"caches": [ "caches": [
MokaCacheSerializer(&app.ip_semaphores), MokaCacheSerializer(&app.ip_semaphores),
MokaCacheSerializer(&app.jsonrpc_response_cache), MokaCacheSerializer(&app.jsonrpc_response_cache),
@ -244,7 +207,7 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
"head_block_hash": head_block.as_ref().map(|x| x.hash()), "head_block_hash": head_block.as_ref().map(|x| x.hash()),
"hostname": app.hostname, "hostname": app.hostname,
"payment_factory_address": app.config.deposit_factory_contract, "payment_factory_address": app.config.deposit_factory_contract,
"private_rpcs": private_rpcs, "private_rpcs": app.private_rpcs,
"version": APP_USER_AGENT, "version": APP_USER_AGENT,
}); });

View File

@ -442,7 +442,7 @@ impl ConsensusFinder {
let consensus_head_block = new_ranked_rpcs.head_block.clone(); let consensus_head_block = new_ranked_rpcs.head_block.clone();
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs(); let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
let num_active_rpcs = self.len(); let num_active_rpcs = self.len();
let total_rpcs = web3_rpcs.len().await; let total_rpcs = web3_rpcs.len();
let new_ranked_rpcs = Arc::new(new_ranked_rpcs); let new_ranked_rpcs = Arc::new(new_ranked_rpcs);

View File

@ -19,7 +19,9 @@ use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use itertools::Itertools; use itertools::Itertools;
use moka::future::CacheBuilder; use moka::future::CacheBuilder;
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::borrow::Cow; use std::borrow::Cow;
@ -28,7 +30,7 @@ use std::fmt::{self, Display};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch, RwLock}; use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, sleep_until, Duration, Instant}; use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -212,7 +214,7 @@ impl Web3Rpcs {
Ok(Ok((new_rpc, _handle))) => { Ok(Ok((new_rpc, _handle))) => {
// web3 connection worked // web3 connection worked
let old_rpc = self.by_name.read().await.get(&new_rpc.name).map(Arc::clone); let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone);
// clean up the old rpc if it exists // clean up the old rpc if it exists
if let Some(old_rpc) = old_rpc { if let Some(old_rpc) = old_rpc {
@ -234,10 +236,7 @@ impl Web3Rpcs {
// new rpc is synced (or old one was not synced). update the local map // new rpc is synced (or old one was not synced). update the local map
// make sure that any new requests use the new connection // make sure that any new requests use the new connection
self.by_name self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
.write()
.await
.insert(new_rpc.name.clone(), new_rpc);
// tell the old rpc to disconnect // tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
@ -245,10 +244,7 @@ impl Web3Rpcs {
disconnect_sender.send_replace(true); disconnect_sender.send_replace(true);
} }
} else { } else {
self.by_name self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
.write()
.await
.insert(new_rpc.name.clone(), new_rpc);
} }
} }
Ok(Err(err)) => { Ok(Err(err)) => {
@ -265,12 +261,12 @@ impl Web3Rpcs {
} }
// TODO: remove any RPCs that were part of the config, but are now removed // TODO: remove any RPCs that were part of the config, but are now removed
let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect(); let active_names: Vec<_> = self.by_name.read().keys().cloned().collect();
for name in active_names { for name in active_names {
if names_to_keep.contains(&name) { if names_to_keep.contains(&name) {
continue; continue;
} }
if let Some(old_rpc) = self.by_name.write().await.remove(&name) { if let Some(old_rpc) = self.by_name.write().remove(&name) {
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
debug!("telling {} to disconnect. no longer needed", old_rpc); debug!("telling {} to disconnect. no longer needed", old_rpc);
disconnect_sender.send_replace(true); disconnect_sender.send_replace(true);
@ -278,7 +274,7 @@ impl Web3Rpcs {
} }
} }
let num_rpcs = self.len().await; let num_rpcs = self.len();
if num_rpcs < self.min_synced_rpcs { if num_rpcs < self.min_synced_rpcs {
return Err(Web3ProxyError::NotEnoughRpcs { return Err(Web3ProxyError::NotEnoughRpcs {
@ -290,16 +286,16 @@ impl Web3Rpcs {
Ok(()) Ok(())
} }
pub async fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> { pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().await.get(conn_name).cloned() self.by_name.read().get(conn_name).cloned()
} }
pub async fn len(&self) -> usize { pub fn len(&self) -> usize {
self.by_name.read().await.len() self.by_name.read().len()
} }
pub async fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.by_name.read().await.is_empty() self.by_name.read().is_empty()
} }
/// TODO: rename to be consistent between "head" and "synced" /// TODO: rename to be consistent between "head" and "synced"
@ -646,7 +642,7 @@ impl Web3Rpcs {
let mut earliest_retry_at = None; let mut earliest_retry_at = None;
// TODO: filter the rpcs with Ranked.will_work_now // TODO: filter the rpcs with Ranked.will_work_now
let mut all_rpcs: Vec<_> = self.by_name.read().await.values().cloned().collect(); let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
let mut max_count = if let Some(max_count) = max_count { let mut max_count = if let Some(max_count) = max_count {
max_count max_count
@ -1055,7 +1051,7 @@ impl Web3Rpcs {
return Err(err.into()); return Err(err.into());
} }
let num_conns = self.len().await; let num_conns = self.len();
let num_skipped = skip_rpcs.len(); let num_skipped = skip_rpcs.len();
let needed = min_block_needed.max(max_block_needed); let needed = min_block_needed.max(max_block_needed);
@ -1297,15 +1293,15 @@ impl fmt::Debug for Web3Rpcs {
} }
} }
impl Web3Rpcs { impl Serialize for Web3Rpcs {
pub async fn serialize_async<W: std::io::Write>( fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
&self, where
mut serializer: serde_json::Serializer<W>, S: Serializer,
) -> Result<(), serde_json::Error> { {
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
{ {
let by_name = self.by_name.read().await; let by_name = self.by_name.read();
let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect(); let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs" // TODO: coordinate with frontend team to rename "conns" to "rpcs"
state.serialize_field("conns", &rpcs)?; state.serialize_field("conns", &rpcs)?;