diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 1be2dea9..d74300e3 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1181,7 +1181,7 @@ impl Web3ProxyApp { mut params: serde_json::Value, head_block_num: Option, request_metadata: &Arc, - ) -> Web3ProxyResult>> { + ) -> Web3ProxyResult>> { // TODO: don't clone into a new string? let request_method = method.to_string(); @@ -1189,7 +1189,7 @@ impl Web3ProxyApp { // TODO: serve net_version without querying the backend // TODO: don't force RawValue - let response_data: JsonRpcResponseEnum> = match request_method.as_ref() { + let response_data: JsonRpcResponseEnum> = match request_method.as_ref() { // lots of commands are blocked method @ ("db_getHex" | "db_getString" @@ -1701,14 +1701,14 @@ impl Web3ProxyApp { match self .jsonrpc_response_cache - .get_value_or_guard_async(cache_key).await + .get_value_or_guard_async(cache_key.hash()).await { Ok(x) => x, Err(x) => { let response_data = timeout( duration, self.balanced_rpcs - .try_proxy_connection::<_, Box>( + .try_proxy_connection::<_, Arc>( method, ¶ms, Some(request_metadata), @@ -1718,7 +1718,7 @@ impl Web3ProxyApp { ) .await?; - let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + let response_data: JsonRpcResponseEnum> = response_data.try_into()?; if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors { // TODO: convert the Box to an Arc? @@ -1732,7 +1732,7 @@ impl Web3ProxyApp { let x = timeout( duration, self.balanced_rpcs - .try_proxy_connection::<_, Box>( + .try_proxy_connection::<_, Arc>( method, ¶ms, Some(request_metadata), diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index eb7a01a7..9b67c508 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -8,6 +8,7 @@ use serde_json::json; use serde_json::value::{to_raw_value, RawValue}; use std::borrow::Cow; use std::fmt; +use std::sync::Arc; pub trait JsonRpcParams = Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static; pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send; @@ -234,7 +235,7 @@ pub struct JsonRpcForwardedResponse { pub jsonrpc: &'static str, pub id: Box, #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option>, + pub result: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } @@ -279,7 +280,7 @@ impl JsonRpcForwardedResponse { } } - pub fn from_raw_response(result: Box, id: Box) -> Self { + pub fn from_raw_response(result: Arc, id: Box) -> Self { JsonRpcForwardedResponse { jsonrpc: "2.0", id, @@ -292,6 +293,9 @@ impl JsonRpcForwardedResponse { pub fn from_value(result: serde_json::Value, id: Box) -> Self { let partial_response = to_raw_value(&result).expect("Value to RawValue should always work"); + // TODO: an Arc is a waste here. change JsonRpcForwardedResponse to take an enum? + let partial_response = partial_response.into(); + JsonRpcForwardedResponse { jsonrpc: "2.0", id, @@ -339,7 +343,7 @@ impl JsonRpcForwardedResponse { } pub fn try_from_response_result( - result: Result, ProviderError>, + result: Result, ProviderError>, id: Box, ) -> Web3ProxyResult { match result { @@ -348,7 +352,7 @@ impl JsonRpcForwardedResponse { } } - pub fn from_response_data(data: JsonRpcResponseEnum>, id: Box) -> Self { + pub fn from_response_data(data: JsonRpcResponseEnum>, id: Box) -> Self { match data { JsonRpcResponseEnum::Result { value, .. } => Self::from_raw_response(value, id), JsonRpcResponseEnum::RpcError { diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 78fb6f2c..562e7de4 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -8,6 +8,7 @@ use std::{ borrow::Cow, hash::{BuildHasher, Hash, Hasher}, num::NonZeroU32, + sync::Arc, }; #[derive(Clone, Debug, Eq, From)] @@ -19,6 +20,9 @@ pub struct JsonRpcQueryCacheKey { } impl JsonRpcQueryCacheKey { + pub fn hash(&self) -> u64 { + self.hash + } pub fn from_block_num(&self) -> Option { self.from_block_num } @@ -79,7 +83,7 @@ impl JsonRpcQueryCacheKey { } pub type JsonRpcResponseCache = - CacheWithTTL>, JsonRpcResponseWeigher>; + CacheWithTTL>, JsonRpcResponseWeigher>; #[derive(Clone)] pub struct JsonRpcResponseWeigher; @@ -107,7 +111,7 @@ impl JsonRpcResponseEnum { } } -impl From for JsonRpcResponseEnum> { +impl From for JsonRpcResponseEnum> { fn from(value: serde_json::Value) -> Self { let value = RawValue::from_string(value.to_string()).unwrap(); @@ -115,12 +119,24 @@ impl From for JsonRpcResponseEnum> { } } -impl From> for JsonRpcResponseEnum> { +impl From> for JsonRpcResponseEnum> { + fn from(value: Arc) -> Self { + let num_bytes = value.get().len(); + + let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + + Self::Result { value, num_bytes } + } +} + +impl From> for JsonRpcResponseEnum> { fn from(value: Box) -> Self { let num_bytes = value.get().len(); let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let value = value.into(); + Self::Result { value, num_bytes } } } @@ -140,7 +156,21 @@ impl TryFrom for JsonRpcResponseEnum { } } -impl TryFrom, Web3ProxyError>> for JsonRpcResponseEnum> { +impl TryFrom, Web3ProxyError>> for JsonRpcResponseEnum> { + type Error = Web3ProxyError; + + fn try_from(value: Result, Web3ProxyError>) -> Result { + match value { + Ok(x) => Ok(x.into()), + Err(err) => { + let x: Self = err.try_into()?; + + Ok(x) + } + } + } +} +impl TryFrom, Web3ProxyError>> for JsonRpcResponseEnum> { type Error = Web3ProxyError; fn try_from(value: Result, Web3ProxyError>) -> Result { @@ -205,8 +235,9 @@ impl TryFrom for JsonRpcErrorData { } } -impl Weighter>> for JsonRpcResponseWeigher { - fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseEnum>) -> NonZeroU32 { +// TODO: instead of Arc, be generic +impl Weighter>> for JsonRpcResponseWeigher { + fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseEnum>) -> NonZeroU32 { value.num_bytes() } } @@ -216,7 +247,7 @@ mod tests { use super::{JsonRpcResponseEnum, JsonRpcResponseWeigher}; use quick_cache_ttl::CacheWithTTL; use serde_json::value::RawValue; - use std::{num::NonZeroU32, time::Duration}; + use std::{num::NonZeroU32, sync::Arc, time::Duration}; #[tokio::test(start_paused = true)] async fn test_json_rpc_query_weigher() { @@ -225,7 +256,7 @@ mod tests { let test_cache: CacheWithTTL< u32, - JsonRpcResponseEnum>, + JsonRpcResponseEnum>, JsonRpcResponseWeigher, > = CacheWithTTL::new_with_weights( "test", @@ -238,17 +269,17 @@ mod tests { .await; let small_data = JsonRpcResponseEnum::Result { - value: Default::default(), + value: Box::::default().into(), num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(), }; let max_sized_data = JsonRpcResponseEnum::Result { - value: Default::default(), + value: Box::::default().into(), num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(), }; let oversized_data = JsonRpcResponseEnum::Result { - value: Default::default(), + value: Box::::default().into(), num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(), };