From 042707eca209d4ec1d896a822e373a960d6d5791 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 16 Oct 2023 23:56:38 -0700 Subject: [PATCH] maybe getting rid of this arc will fix streaming taking twice --- web3_proxy/src/app/mod.rs | 166 +++++++--------------- web3_proxy/src/app/ws.rs | 8 +- web3_proxy/src/errors.rs | 31 ++-- web3_proxy/src/frontend/authorization.rs | 14 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 2 +- web3_proxy/src/jsonrpc/error.rs | 14 +- web3_proxy/src/jsonrpc/request_builder.rs | 2 +- web3_proxy/src/jsonrpc/response.rs | 54 ++++--- web3_proxy/src/response_cache.rs | 109 +++++++------- web3_proxy/src/rpcs/request.rs | 4 +- 10 files changed, 163 insertions(+), 241 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 77d4981b..6b56cd3d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -7,7 +7,7 @@ use crate::frontend::authorization::Authorization; use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA}; use crate::jsonrpc::{ self, JsonRpcErrorData, JsonRpcParams, JsonRpcRequestEnum, JsonRpcResultData, LooseId, - SingleRequest, SingleResponse, ValidatedRequest, + ParsedResponse, SingleRequest, SingleResponse, ValidatedRequest, }; use crate::relational_db::{connect_db, migrate_db}; use crate::response_cache::{ForwardedResponse, JsonRpcResponseCache, JsonRpcResponseWeigher}; @@ -47,7 +47,7 @@ use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::{yield_now, JoinHandle}; -use tokio::time::{sleep, sleep_until, timeout, timeout_at, Instant}; +use tokio::time::{sleep, sleep_until, timeout_at, Instant}; use tracing::{error, info, trace, warn}; // TODO: make this customizable? @@ -1168,30 +1168,14 @@ impl App { let (code, response) = match last_response { Ok(response_data) => { web3_request.error_response.store(false, Ordering::Relaxed); + + // TODO: is it true that all jsonrpc errors are user errors? web3_request .user_error_response - .store(false, Ordering::Relaxed); + .store(response_data.is_jsonrpc_err(), Ordering::Relaxed); (StatusCode::OK, response_data) } - Err(err @ Web3ProxyError::NullJsonRpcResult) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); - - err.as_json_response_parts(web3_request.id()) - } - Err(Web3ProxyError::JsonRpcResponse(response_data)) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(response_data.is_error(), Ordering::Relaxed); - - let response = - jsonrpc::ParsedResponse::from_response_data(response_data, web3_request.id()); - (StatusCode::OK, response.into()) - } Err(err) => { // max tries exceeded. return the error @@ -1628,11 +1612,11 @@ impl App { let x: SingleResponse = if let Some(data) = self.jsonrpc_response_cache.get(&cache_key).await { // it was cached! easy! - // TODO: wait. this currently panics. why? jsonrpc::ParsedResponse::from_response_data(data, web3_request.id()).into() } else if self.jsonrpc_response_failed_cache_keys.contains_key(&cache_key) { - // this is a cache_key that we know won't cache - // NOTICE! We do **NOT** use get which means the key's hotness is not updated. we don't use time-to-idler here so thats fine. but be careful if that changes + // this is a request that we have previously failed to cache. don't try the cache again + // TODO: is "contains_key" okay, or do we need "get($cache_key).await"? + // TODO: DRY. we do this timeout and try_proxy_connection below, too. timeout_at( web3_request.expire_at(), self.balanced_rpcs @@ -1641,109 +1625,59 @@ impl App { ) ).await?? } else { - // TODO: acquire a semaphore from a map with the cache key as the key - // TODO: try it, if that fails, then we are already running. wait until the semaphore completes and then run on. they will all go only one at a time though - // TODO: if we got the semaphore, do the try_get_with - // TODO: if the response is too big to cache mark the cache_key as not cacheable. maybe CacheMode can check that cache? + // we used to have a semaphore here, but its faster to just allow duplicate requests while the first is still in flight + // we might do some duplicate requests here, but it seems worth it to get rid of the Arc errors. + let response_data = timeout_at( + web3_request.expire_at(), + self.balanced_rpcs + .try_proxy_connection::>( + web3_request, + ) + ).await?; - let s = self.jsonrpc_response_semaphores.get_with(cache_key, async move { - Arc::new(Semaphore::new(1)) - }).await; + match response_data { + Ok(mut x) => { + match &x { + SingleResponse::Parsed(x) => { + // TODO: don't serialize here! we should already know the size! + let len = serde_json::to_string(&x).unwrap().len(); - // TODO: don't always do 1 second. use the median request latency instead - let mut x = match timeout(Duration::from_secs(1), s.acquire_owned()).await { - Err(_) => { - // TODO: should we try to cache this? whatever has the semaphore //should// handle that for us - timeout_at( - web3_request.expire_at(), - self.balanced_rpcs - .try_proxy_connection::>( - web3_request, - ) - ).await?? - } - Ok(_p) => { - // we got the permit! we are either first, or we were waiting a short time to get it in which case this response should be cached - // TODO: clone less? its spawned so i don't think we can - let f = { - let app = self.clone(); - let web3_request = web3_request.clone(); + if len <= max_response_cache_bytes { + let cached = ForwardedResponse::from(x.payload.clone()); - async move { - app - .jsonrpc_response_cache - .try_get_with::<_, Web3ProxyError>(cache_key, async { - // TODO: dynamic timeout based on whats left on web3_request - let response_data = timeout_at(web3_request.expire_at(), app.balanced_rpcs - .try_proxy_connection::>( - &web3_request, - )).await; - - match response_data { - Ok(response_data) => { - if !web3_request.cache_jsonrpc_errors() && let Err(err) = response_data { - // if we are not supposed to cache jsonrpc errors, - // then we must not convert Provider errors into a ForwardedResponse - // return all the errors now. moka will not cache Err results - Err(err) - } else { - // convert jsonrpc errors into ForwardedResponse, but leave the rest as errors - let response_data: ForwardedResponse> = response_data.try_into()?; - - if response_data.is_null() { - // don't ever cache "null" as a success. its too likely to be a problem - Err(Web3ProxyError::NullJsonRpcResult) - } else if response_data.num_bytes() > max_response_cache_bytes { - // don't cache really large requests - // TODO: emit a stat - Err(Web3ProxyError::JsonRpcResponse(response_data)) - } else { - // TODO: response data should maybe be Arc>>, but that's more work - Ok(response_data) - } - } - } - Err(err) => Err(Web3ProxyError::from(err)), - } - }).await - } - }; - - // this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache - // TODO: is this expect actually safe!? could there be a background process that still has the arc? - let x = match tokio::spawn(f).await? { - Ok(response_data) => Ok(jsonrpc::ParsedResponse::from_response_data(response_data, Default::default()).into()), - Err(err) => { - self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await; - - if let Web3ProxyError::StreamResponse(x) = err.as_ref() { - if let Some(x) = x.lock().take() { - Ok(jsonrpc::SingleResponse::Stream(x)) - } else { - let method = web3_request.inner.method(); - let params = web3_request.inner.params(); - let err: Web3ProxyError = anyhow::anyhow!("stream was already taken. please report this in Discord. method={:?} params={:?}", method, params).into(); - - Err(Arc::new(err)) - } + self.jsonrpc_response_cache.insert(cache_key, cached).await; } else { - Err(err) + self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await; } - }, - }; + } + SingleResponse::Stream(..) => { + self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await; + } + } - let mut x = x?; - - // clear the id. theres no point including it in our cached response - x.set_id(Default::default()); + x.set_id(web3_request.id()); x } - }; + Err(err) => { + if web3_request.cache_jsonrpc_errors() { + // we got an error, but we are supposed to cache jsonrpc errors. + let x: Result>, Web3ProxyError> = err.try_into(); - x.set_id(web3_request.id()); + if x.is_err() { + // we still have an Err. it must not have been a jsonrpc error + self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await; + } - x + // TODO: needing multiple into/try_into/from must be inefficient. investigate this + ParsedResponse::from_response_data(x?, web3_request.id()).into() + } else { + // we got an error, and we are not supposed to cache jsonrpc errors. exit early + self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await; + return Err(err); + } + } + } }; x diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index ab6af228..7aaef131 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -123,8 +123,8 @@ impl App { let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); - // we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); + // we could use ForwardedResponse::num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len() as u64; // TODO: do clients support binary messages? // TODO: can we check a content type header? @@ -212,8 +212,8 @@ impl App { let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); - // we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); + // we could use ForwardedResponse::num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len() as u64; subscription_web3_request.add_response(response_bytes); diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 1a6c3dff..f9a8653b 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -1,7 +1,7 @@ //! Utlities for logging errors for admins and displaying errors to users. use crate::frontend::authorization::Authorization; -use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse}; +use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse, StreamResponse}; use crate::response_cache::ForwardedResponse; use crate::rpcs::provider::EthersHttpProvider; use axum::extract::rejection::JsonRejection; @@ -19,7 +19,6 @@ use http::header::InvalidHeaderValue; use http::uri::InvalidUri; use ipnet::AddrParseError; use migration::sea_orm::DbErr; -use parking_lot::Mutex; use redis_rate_limiter::redis::RedisError; use redis_rate_limiter::RedisPoolError; use reqwest::header::ToStrError; @@ -136,16 +135,6 @@ pub enum Web3ProxyError { #[from(ignore)] MethodNotFound(Cow<'static, str>), NoVolatileRedisDatabase, - /// make it easy to skip caching large results - #[error(ignore)] - #[display(fmt = "{:?}", _0)] - JsonRpcResponse(ForwardedResponse>), - /// make it easy to skip caching streaming results - #[error(ignore)] - #[display(fmt = "{:?}", _0)] - StreamResponse(Mutex>>>), - /// make it easy to skip caching null results - NullJsonRpcResult, OriginRequired, #[error(ignore)] #[from(ignore)] @@ -171,6 +160,9 @@ pub enum Web3ProxyError { /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] StatusCode(StatusCode, Cow<'static, str>, Option), + #[display(fmt = "streaming response")] + #[error(ignore)] + StreamResponse(StreamResponse>), #[cfg(feature = "stripe")] StripeWebhookError(stripe::WebhookError), /// TODO: what should be attached to the timout? @@ -828,17 +820,6 @@ impl Web3ProxyError { }, ) } - Self::JsonRpcResponse(response_enum) => { - // TODO: shame we have to clone, but its an Arc so its not terrible - return (StatusCode::OK, response_enum.clone()); - } - Self::StreamResponse(_resp) => { - // TODO: better way of doing this? - unreachable!("stream is pulled out, not used here"); - } - Self::NullJsonRpcResult => { - return (StatusCode::OK, ForwardedResponse::NullResult); - } Self::OriginRequired => { trace!("OriginRequired"); ( @@ -1046,6 +1027,10 @@ impl Web3ProxyError { }, ) } + Self::StreamResponse(..) => { + // TODO: should it really? + unimplemented!("streaming should be handled elsewhere"); + } #[cfg(feature = "stripe")] Self::StripeWebhookError(err) => { trace!(?err, "StripeWebhookError"); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 9f6e49c7..c22af877 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -179,27 +179,21 @@ pub enum ResponseOrBytes<'a> { Json(&'a serde_json::Value), Response(&'a jsonrpc::SingleResponse), Error(&'a Web3ProxyError), - Bytes(usize), -} - -impl<'a> From for ResponseOrBytes<'a> { - fn from(value: u64) -> Self { - Self::Bytes(value as usize) - } + Bytes(u64), } impl ResponseOrBytes<'_> { - pub fn num_bytes(&self) -> usize { + pub fn num_bytes(&self) -> u64 { match self { Self::Json(x) => serde_json::to_string(x) .expect("this should always serialize") - .len(), + .len() as u64, Self::Response(x) => x.num_bytes(), Self::Bytes(num_bytes) => *num_bytes, Self::Error(x) => { let (_, x) = x.as_response_parts(); - x.num_bytes() as usize + x.num_bytes() } } } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index f7bb77ca..39686ed3 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -397,8 +397,8 @@ async fn websocket_proxy_web3_rpc( let response = jsonrpc::ParsedResponse::from_value(json!(partial_response), web3_request.id()); - // TODO: better way of passing in ParsedResponse let response = jsonrpc::SingleResponse::Parsed(response); + web3_request.add_response(&response); let response = response.parsed().await.expect("Response already parsed"); diff --git a/web3_proxy/src/jsonrpc/error.rs b/web3_proxy/src/jsonrpc/error.rs index faf3214b..e63b471a 100644 --- a/web3_proxy/src/jsonrpc/error.rs +++ b/web3_proxy/src/jsonrpc/error.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; // TODO: impl Error on this? /// All jsonrpc errors use this structure -#[derive(Debug, Deserialize, Serialize, Clone)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct JsonRpcErrorData { /// The error code pub code: i64, @@ -15,16 +15,16 @@ pub struct JsonRpcErrorData { } impl JsonRpcErrorData { - pub fn num_bytes(&self) -> usize { + pub fn num_bytes(&self) -> u64 { serde_json::to_string(self) .expect("should always serialize") - .len() + .len() as u64 } - pub fn is_retryable(&self) -> bool { - // TODO: move stuff from request to here - todo!() - } + // pub fn is_retryable(&self) -> bool { + // // TODO: move stuff from request to here + // todo!() + // } } impl From<&'static str> for JsonRpcErrorData { diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index 9f46b964..610d42e2 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -554,7 +554,7 @@ impl ValidatedRequest { // TODO: fetch? set? should it be None in a Mutex? or a OnceCell? let response = response.into(); - let num_bytes = response.num_bytes() as u64; + let num_bytes = response.num_bytes(); self.response_bytes .fetch_add(num_bytes, atomic::Ordering::Relaxed); diff --git a/web3_proxy/src/jsonrpc/response.rs b/web3_proxy/src/jsonrpc/response.rs index fd1ad868..d53da85b 100644 --- a/web3_proxy/src/jsonrpc/response.rs +++ b/web3_proxy/src/jsonrpc/response.rs @@ -6,6 +6,7 @@ use axum::body::StreamBody; use axum::response::IntoResponse; use axum::Json; use bytes::{Bytes, BytesMut}; +use derivative::Derivative; use futures_util::stream::{self, StreamExt}; use futures_util::TryStreamExt; use serde::{de, Deserialize, Serialize}; @@ -19,7 +20,7 @@ pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + f /// TODO: borrow values to avoid allocs if possible /// TODO: lots of overlap with `SingleForwardedResponse` -#[derive(Debug, Serialize)] +#[derive(Clone, Debug, Serialize)] pub struct ParsedResponse> { pub jsonrpc: String, pub id: Box, @@ -193,30 +194,32 @@ where } } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[serde(untagged)] pub enum ResponsePayload { Success { result: T }, Error { error: JsonRpcErrorData }, } -#[derive(Debug)] +#[derive(Derivative)] +#[derivative(Debug)] pub struct StreamResponse { _t: PhantomData, buffer: Bytes, + num_bytes: Option, + #[derivative(Debug = "ignore")] response: reqwest::Response, web3_request: Arc, } impl StreamResponse { - // TODO: error handing pub async fn read(self) -> Web3ProxyResult> where T: de::DeserializeOwned, { let mut buffer = BytesMut::with_capacity(self.buffer.len()); - buffer.extend_from_slice(&self.buffer); - buffer.extend_from_slice(&self.response.bytes().await?); + buffer.extend(self.buffer); + buffer.extend(self.response.bytes().await?); let parsed = serde_json::from_slice(&buffer)?; Ok(parsed) } @@ -227,7 +230,7 @@ impl IntoResponse for StreamResponse { let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) }) .chain(self.response.bytes_stream()) .map_ok(move |x| { - let len = x.len(); + let len = x.len() as u64; self.web3_request.add_response(len); @@ -241,6 +244,8 @@ impl IntoResponse for StreamResponse { #[derive(Debug)] pub enum SingleResponse> { /// TODO: save the size here so we don't have to serialize again + /// TODO: before doing that, make sure we don't swap back and forth between parsed and stream and single and forwarded and end up serializing too many times + /// TODO: should this be a ForwardedResponse instead of ParsedResponse Parsed(ParsedResponse), Stream(StreamResponse), } @@ -249,6 +254,13 @@ impl SingleResponse where T: de::DeserializeOwned + Serialize, { + pub fn is_jsonrpc_err(&self) -> bool { + match self { + Self::Parsed(resp, ..) => matches!(resp.payload, ResponsePayload::Error { .. }), + Self::Stream(..) => false, + } + } + // TODO: threshold from configs // TODO: error handling // TODO: if a large stream's response's initial chunk "error" then we should buffer it @@ -261,19 +273,21 @@ where // short Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?), // long - Some(_) => Ok(Self::Stream(StreamResponse { + Some(len) => Ok(Self::Stream(StreamResponse { _t: PhantomData::, buffer: Bytes::new(), + num_bytes: Some(len), response, web3_request: web3_request.clone(), })), // unknown length. maybe compressed. maybe streaming. maybe both None => { - let mut buffer = BytesMut::new(); + // todo: this might over-allocate, but it's probably fine + let mut buffer = BytesMut::with_capacity(nbytes as usize); while (buffer.len() as u64) < nbytes { match response.chunk().await? { Some(chunk) => { - buffer.extend_from_slice(&chunk); + buffer.extend(chunk); } None => { // it was short @@ -287,6 +301,7 @@ where Ok(Self::Stream(StreamResponse { _t: PhantomData::, buffer, + num_bytes: None, response, web3_request: web3_request.clone(), })) @@ -302,26 +317,23 @@ where // TODO: error handling pub async fn parsed(self) -> Web3ProxyResult> { match self { - Self::Parsed(resp) => Ok(resp), - Self::Stream(resp) => resp.read().await, + Self::Parsed(resp, ..) => Ok(resp), + Self::Stream(resp, ..) => resp.read().await, } } - pub fn num_bytes(&self) -> usize { + pub fn num_bytes(&self) -> u64 { match self { Self::Parsed(response) => serde_json::to_string(response) .expect("this should always serialize") - .len(), - Self::Stream(response) => match response.response.content_length() { - Some(len) => len as usize, - None => 0, - }, + .len() as u64, + Self::Stream(response) => response.num_bytes.unwrap_or(0), } } pub fn set_id(&mut self, id: Box) { match self { - SingleResponse::Parsed(x) => { + SingleResponse::Parsed(x, ..) => { x.id = id; } SingleResponse::Stream(..) => { @@ -343,8 +355,8 @@ where { fn into_response(self) -> axum::response::Response { match self { - Self::Parsed(resp) => Json(resp).into_response(), - Self::Stream(resp) => resp.into_response(), + Self::Parsed(resp, ..) => Json(resp).into_response(), + Self::Stream(resp, ..) => resp.into_response(), } } } diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index b25c58f9..b1ca9286 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -2,7 +2,7 @@ use crate::{ block_number::{BlockNumAndHash, CacheMode}, errors::{Web3ProxyError, Web3ProxyResult}, frontend::authorization::RequestOrMethod, - jsonrpc::{self, JsonRpcErrorData}, + jsonrpc::{self, JsonRpcErrorData, ResponsePayload}, }; use derive_more::From; use ethers::{ @@ -11,7 +11,6 @@ use ethers::{ }; use hashbrown::hash_map::DefaultHashBuilder; use moka::future::Cache; -use parking_lot::Mutex; use serde_json::value::RawValue; use std::{ hash::{BuildHasher, Hash, Hasher}, @@ -86,26 +85,25 @@ impl<'a> JsonRpcQueryCacheKey<'a> { } } -// TODO: i think if we change this to Arc>>, we can speed things up pub type JsonRpcResponseCache = Cache>>; -/// TODO: we might need one that holds RawValue and one that holds serde_json::Value +/// TODO: think about this more. there is a lot of overlap with ParsedResponse #[derive(Clone, Debug)] -pub enum ForwardedResponse { +pub enum ForwardedResponse { NullResult, Result { - value: R, - num_bytes: u32, + value: T, + num_bytes: u64, }, RpcError { error_data: JsonRpcErrorData, - num_bytes: u32, + num_bytes: u64, }, } // TODO: impl for other inner result types? impl ForwardedResponse { - pub fn num_bytes(&self) -> u32 { + pub fn num_bytes(&self) -> u64 { match self { Self::NullResult => 1, Self::Result { num_bytes, .. } => *num_bytes, @@ -138,31 +136,52 @@ impl ForwardedResponse> { } } +impl From>> for ForwardedResponse> { + fn from(value: ResponsePayload>) -> Self { + match value { + ResponsePayload::Success { result } => { + let num_bytes = result.get().len() as u64; + + ForwardedResponse::Result { + value: result, + num_bytes, + } + } + ResponsePayload::Error { error } => { + let num_bytes = error.num_bytes(); + + ForwardedResponse::RpcError { + error_data: error, + num_bytes, + } + } + } + } +} + impl TryFrom> for ForwardedResponse> { type Error = Web3ProxyError; fn try_from(response: Web3ProxyResult) -> Result { - match response { - Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload { + match response? { + jsonrpc::SingleResponse::Parsed(parsed) => match parsed.payload { jsonrpc::ResponsePayload::Success { result } => { - let num_bytes = result.get().len() as u32; + let num_bytes = result.get().len() as u64; + Ok(ForwardedResponse::Result { value: result, num_bytes, }) } jsonrpc::ResponsePayload::Error { error } => { - let num_bytes = error.num_bytes() as u32; + let num_bytes = error.num_bytes(); + Ok(ForwardedResponse::RpcError { error_data: error, - // TODO: this double serializes num_bytes, }) } }, - Ok(jsonrpc::SingleResponse::Stream(stream)) => { - Err(Web3ProxyError::StreamResponse(Mutex::new(Some(stream)))) - } - Err(err) => err.try_into(), + jsonrpc::SingleResponse::Stream(stream) => Err(Web3ProxyError::StreamResponse(stream)), } } } @@ -177,9 +196,7 @@ impl From for ForwardedResponse> { impl From> for ForwardedResponse> { fn from(value: Arc) -> Self { - let num_bytes = value.get().len(); - - let num_bytes = num_bytes as u32; + let num_bytes = value.get().len() as u64; Self::Result { value, num_bytes } } @@ -187,13 +204,9 @@ impl From> for ForwardedResponse> { impl From> for ForwardedResponse> { fn from(value: Box) -> Self { - let num_bytes = value.get().len(); + let value: Arc = value.into(); - let num_bytes = num_bytes as u32; - - let value = value.into(); - - Self::Result { value, num_bytes } + value.into() } } @@ -206,8 +219,6 @@ impl TryFrom for ForwardedResponse> { Ok(x) => Ok(x.into()), Err(..) => Err(err.into()), }, - Web3ProxyError::NullJsonRpcResult => Ok(ForwardedResponse::NullResult), - Web3ProxyError::JsonRpcResponse(x) => Ok(x), Web3ProxyError::JsonRpcErrorData(err) => Ok(err.into()), err => Err(err), } @@ -228,27 +239,11 @@ impl TryFrom, Web3ProxyError>> for ForwardedResponse, Web3ProxyError>> for ForwardedResponse> { - type Error = Web3ProxyError; - - fn try_from(value: Result, Web3ProxyError>) -> Result { - match value { - Ok(x) => Ok(x.into()), - Err(err) => { - let x: Self = err.try_into()?; - - Ok(x) - } - } - } -} impl From for ForwardedResponse { fn from(value: JsonRpcErrorData) -> Self { // TODO: wrap the error in a complete response? - let num_bytes = serde_json::to_string(&value).unwrap().len(); - - let num_bytes = num_bytes as u32; + let num_bytes = serde_json::to_string(&value).unwrap().len() as u64; Self::RpcError { error_data: value, @@ -317,14 +312,16 @@ impl<'a> TryFrom<&'a WsClientError> for JsonRpcErrorData { pub struct JsonRpcResponseWeigher(pub u32); impl JsonRpcResponseWeigher { - pub fn weigh(&self, _key: &K, value: &ForwardedResponse) -> u32 { - let x = value.num_bytes(); - - if x > self.0 { - // return max. the item may start to be inserted into the cache, but it will be immediatly removed - u32::MAX + pub fn weigh(&self, _key: &K, value: &ForwardedResponse) -> u32 { + if let Ok(x) = value.num_bytes().try_into() { + if x > self.0 { + // return max. the item may start to be inserted into the cache, but it will be immediatly removed + u32::MAX + } else { + x + } } else { - x + u32::MAX } } } @@ -346,21 +343,21 @@ mod tests { let small_data: ForwardedResponse> = ForwardedResponse::Result { value: Box::::default().into(), - num_bytes: max_item_weight / 2, + num_bytes: (max_item_weight / 2) as u64, }; assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2); let max_sized_data: ForwardedResponse> = ForwardedResponse::Result { value: Box::::default().into(), - num_bytes: max_item_weight, + num_bytes: max_item_weight as u64, }; assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight); let oversized_data: ForwardedResponse> = ForwardedResponse::Result { value: Box::::default().into(), - num_bytes: max_item_weight * 2, + num_bytes: (max_item_weight * 2) as u64, }; assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index f0bcc86c..12189556 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -340,7 +340,7 @@ impl OpenRequestHandle { // true if we got a jsonrpc result. a jsonrpc error or other error is false. // TODO: counters for errors vs jsonrpc vs success? let response_is_success = match &response { - Ok(jsonrpc::SingleResponse::Parsed(x)) => { + Ok(jsonrpc::SingleResponse::Parsed(x, ..)) => { matches!(&x.payload, ResponsePayload::Success { .. }) } Ok(jsonrpc::SingleResponse::Stream(..)) => true, @@ -368,7 +368,7 @@ impl OpenRequestHandle { } let response_type: ResponseType = match &response { - Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload { + Ok(jsonrpc::SingleResponse::Parsed(x, ..)) => match &x.payload { ResponsePayload::Success { .. } => unreachable!(), ResponsePayload::Error { error } => { trace!(?error, "jsonrpc error data");