First pass implementation of async serialize for Web3Rps (#192)
Co-authored-by: Rory Neithinger <rorytrent@gmail.com>
This commit is contained in:
parent
fe45813e4c
commit
269aa32260
@ -1029,7 +1029,7 @@ impl Web3ProxyApp {
|
|||||||
request_metadata: &Arc<RequestMetadata>,
|
request_metadata: &Arc<RequestMetadata>,
|
||||||
) -> Web3ProxyResult<Box<RawValue>> {
|
) -> Web3ProxyResult<Box<RawValue>> {
|
||||||
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
|
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
|
||||||
if !protected_rpcs.is_empty() {
|
if !protected_rpcs.is_empty().await {
|
||||||
let protected_response = protected_rpcs
|
let protected_response = protected_rpcs
|
||||||
.try_send_all_synced_connections(
|
.try_send_all_synced_connections(
|
||||||
method,
|
method,
|
||||||
|
@ -21,7 +21,7 @@ use http::HeaderMap;
|
|||||||
use moka::future::Cache;
|
use moka::future::Cache;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use serde::{ser::SerializeStruct, Serialize};
|
use serde::{ser::SerializeStruct, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::{json, value::RawValue};
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
@ -190,11 +190,48 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
|
|||||||
// TODO: get out of app.balanced_rpcs instead?
|
// TODO: get out of app.balanced_rpcs instead?
|
||||||
let head_block = app.watch_consensus_head_receiver.borrow().clone();
|
let head_block = app.watch_consensus_head_receiver.borrow().clone();
|
||||||
|
|
||||||
|
let balanced_rpcs = {
|
||||||
|
let mut buf = Vec::<u8>::new();
|
||||||
|
let ser = serde_json::Serializer::new(&mut buf);
|
||||||
|
app.balanced_rpcs
|
||||||
|
.serialize_async(ser)
|
||||||
|
.await
|
||||||
|
.expect("TODO: can this fail?");
|
||||||
|
let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?"));
|
||||||
|
RawValue::from_string(strbuf).expect("TODO: can this fail?")
|
||||||
|
};
|
||||||
|
|
||||||
|
let bundler_4337_rpcs = match &app.bundler_4337_rpcs {
|
||||||
|
Some(rpcs) => {
|
||||||
|
let mut buf = Vec::<u8>::new();
|
||||||
|
let ser = serde_json::Serializer::new(&mut buf);
|
||||||
|
rpcs.serialize_async(ser)
|
||||||
|
.await
|
||||||
|
.expect("TODO: can this fail?");
|
||||||
|
let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?"));
|
||||||
|
Some(RawValue::from_string(strbuf).expect("TODO: can this fail?"))
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let private_rpcs = match &app.private_rpcs {
|
||||||
|
Some(rpcs) => {
|
||||||
|
let mut buf = Vec::<u8>::new();
|
||||||
|
let ser = serde_json::Serializer::new(&mut buf);
|
||||||
|
rpcs.serialize_async(ser)
|
||||||
|
.await
|
||||||
|
.expect("TODO: can this fail?");
|
||||||
|
let strbuf = String::from(std::str::from_utf8(&buf).expect("TODO: can this fail?"));
|
||||||
|
Some(RawValue::from_string(strbuf).expect("TODO: can this fail?"))
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
|
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
|
||||||
// TODO: the hostname is probably not going to change. only get once at the start?
|
// TODO: the hostname is probably not going to change. only get once at the start?
|
||||||
let body = json!({
|
let body = json!({
|
||||||
"balanced_rpcs": app.balanced_rpcs,
|
"balanced_rpcs": balanced_rpcs,
|
||||||
"bundler_4337_rpcs": app.bundler_4337_rpcs,
|
"bundler_4337_rpcs": bundler_4337_rpcs,
|
||||||
"caches": [
|
"caches": [
|
||||||
MokaCacheSerializer(&app.ip_semaphores),
|
MokaCacheSerializer(&app.ip_semaphores),
|
||||||
MokaCacheSerializer(&app.jsonrpc_response_cache),
|
MokaCacheSerializer(&app.jsonrpc_response_cache),
|
||||||
@ -207,7 +244,7 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
|
|||||||
"head_block_hash": head_block.as_ref().map(|x| x.hash()),
|
"head_block_hash": head_block.as_ref().map(|x| x.hash()),
|
||||||
"hostname": app.hostname,
|
"hostname": app.hostname,
|
||||||
"payment_factory_address": app.config.deposit_factory_contract,
|
"payment_factory_address": app.config.deposit_factory_contract,
|
||||||
"private_rpcs": app.private_rpcs,
|
"private_rpcs": private_rpcs,
|
||||||
"version": APP_USER_AGENT,
|
"version": APP_USER_AGENT,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -442,7 +442,7 @@ impl ConsensusFinder {
|
|||||||
let consensus_head_block = new_ranked_rpcs.head_block.clone();
|
let consensus_head_block = new_ranked_rpcs.head_block.clone();
|
||||||
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
|
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
|
||||||
let num_active_rpcs = self.len();
|
let num_active_rpcs = self.len();
|
||||||
let total_rpcs = web3_rpcs.len();
|
let total_rpcs = web3_rpcs.len().await;
|
||||||
|
|
||||||
let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
|
let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
|
||||||
|
|
||||||
|
@ -19,7 +19,6 @@ use futures::StreamExt;
|
|||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use moka::future::CacheBuilder;
|
use moka::future::CacheBuilder;
|
||||||
use parking_lot::RwLock;
|
|
||||||
use serde::ser::{SerializeStruct, Serializer};
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@ -29,7 +28,7 @@ use std::fmt::{self, Display};
|
|||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch, RwLock};
|
||||||
use tokio::time::{sleep, sleep_until, Duration, Instant};
|
use tokio::time::{sleep, sleep_until, Duration, Instant};
|
||||||
use tracing::{debug, error, info, trace, warn};
|
use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
@ -217,7 +216,7 @@ impl Web3Rpcs {
|
|||||||
Ok(Ok((new_rpc, _handle))) => {
|
Ok(Ok((new_rpc, _handle))) => {
|
||||||
// web3 connection worked
|
// web3 connection worked
|
||||||
|
|
||||||
let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone);
|
let old_rpc = self.by_name.read().await.get(&new_rpc.name).map(Arc::clone);
|
||||||
|
|
||||||
// clean up the old rpc if it exists
|
// clean up the old rpc if it exists
|
||||||
if let Some(old_rpc) = old_rpc {
|
if let Some(old_rpc) = old_rpc {
|
||||||
@ -239,7 +238,10 @@ impl Web3Rpcs {
|
|||||||
|
|
||||||
// new rpc is synced (or old one was not synced). update the local map
|
// new rpc is synced (or old one was not synced). update the local map
|
||||||
// make sure that any new requests use the new connection
|
// make sure that any new requests use the new connection
|
||||||
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
|
self.by_name
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(new_rpc.name.clone(), new_rpc);
|
||||||
|
|
||||||
// tell the old rpc to disconnect
|
// tell the old rpc to disconnect
|
||||||
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
||||||
@ -247,7 +249,10 @@ impl Web3Rpcs {
|
|||||||
disconnect_sender.send_replace(true);
|
disconnect_sender.send_replace(true);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
|
self.by_name
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(new_rpc.name.clone(), new_rpc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
@ -264,12 +269,12 @@ impl Web3Rpcs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove any RPCs that were part of the config, but are now removed
|
// TODO: remove any RPCs that were part of the config, but are now removed
|
||||||
let active_names: Vec<_> = self.by_name.read().keys().cloned().collect();
|
let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect();
|
||||||
for name in active_names {
|
for name in active_names {
|
||||||
if names_to_keep.contains(&name) {
|
if names_to_keep.contains(&name) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Some(old_rpc) = self.by_name.write().remove(&name) {
|
if let Some(old_rpc) = self.by_name.write().await.remove(&name) {
|
||||||
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
||||||
debug!("telling {} to disconnect. no longer needed", old_rpc);
|
debug!("telling {} to disconnect. no longer needed", old_rpc);
|
||||||
disconnect_sender.send_replace(true);
|
disconnect_sender.send_replace(true);
|
||||||
@ -277,7 +282,7 @@ impl Web3Rpcs {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_rpcs = self.len();
|
let num_rpcs = self.len().await;
|
||||||
|
|
||||||
if num_rpcs < self.min_synced_rpcs {
|
if num_rpcs < self.min_synced_rpcs {
|
||||||
return Err(Web3ProxyError::NotEnoughRpcs {
|
return Err(Web3ProxyError::NotEnoughRpcs {
|
||||||
@ -289,16 +294,16 @@ impl Web3Rpcs {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
|
pub async fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
|
||||||
self.by_name.read().get(conn_name).cloned()
|
self.by_name.read().await.get(conn_name).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
pub async fn len(&self) -> usize {
|
||||||
self.by_name.read().len()
|
self.by_name.read().await.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
pub async fn is_empty(&self) -> bool {
|
||||||
self.by_name.read().is_empty()
|
self.by_name.read().await.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO: rename to be consistent between "head" and "synced"
|
/// TODO: rename to be consistent between "head" and "synced"
|
||||||
@ -645,7 +650,7 @@ impl Web3Rpcs {
|
|||||||
let mut earliest_retry_at = None;
|
let mut earliest_retry_at = None;
|
||||||
|
|
||||||
// TODO: filter the rpcs with Ranked.will_work_now
|
// TODO: filter the rpcs with Ranked.will_work_now
|
||||||
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
|
let mut all_rpcs: Vec<_> = self.by_name.read().await.values().cloned().collect();
|
||||||
|
|
||||||
let mut max_count = if let Some(max_count) = max_count {
|
let mut max_count = if let Some(max_count) = max_count {
|
||||||
max_count
|
max_count
|
||||||
@ -1055,7 +1060,7 @@ impl Web3Rpcs {
|
|||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_conns = self.len();
|
let num_conns = self.len().await;
|
||||||
let num_skipped = skip_rpcs.len();
|
let num_skipped = skip_rpcs.len();
|
||||||
|
|
||||||
let needed = min_block_needed.max(max_block_needed);
|
let needed = min_block_needed.max(max_block_needed);
|
||||||
@ -1297,15 +1302,15 @@ impl fmt::Debug for Web3Rpcs {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Serialize for Web3Rpcs {
|
impl Web3Rpcs {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
pub async fn serialize_async<W: std::io::Write>(
|
||||||
where
|
&self,
|
||||||
S: Serializer,
|
mut serializer: serde_json::Serializer<W>,
|
||||||
{
|
) -> Result<(), serde_json::Error> {
|
||||||
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
|
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
|
||||||
|
|
||||||
{
|
{
|
||||||
let by_name = self.by_name.read();
|
let by_name = self.by_name.read().await;
|
||||||
let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
|
let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
|
||||||
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
|
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
|
||||||
state.serialize_field("conns", &rpcs)?;
|
state.serialize_field("conns", &rpcs)?;
|
||||||
|
Loading…
Reference in New Issue
Block a user