diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 67f624fd..2a403758 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -6,11 +6,11 @@ use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, Web3Request}; use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA}; use crate::jsonrpc::{ - self, JsonRpcErrorData, JsonRpcId, JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, - JsonRpcResultData, SingleResponse, + self, JsonRpcErrorData, JsonRpcParams, JsonRpcRequestEnum, JsonRpcResultData, LooseId, + SingleRequest, SingleResponse, }; use crate::relational_db::{connect_db, migrate_db}; -use crate::response_cache::{JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher}; +use crate::response_cache::{ForwardedResponse, JsonRpcResponseCache, JsonRpcResponseWeigher}; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::RankedRpcs; use crate::rpcs::many::Web3Rpcs; @@ -983,14 +983,16 @@ impl Web3ProxyApp { authorization: Arc, ) -> Web3ProxyResult { // TODO: proper ids - let request = JsonRpcRequest::new(JsonRpcId::Number(1), method.to_string(), json!(params))?; + let request = SingleRequest::new(LooseId::Number(1), method.to_string(), json!(params))?; let (_, response, _) = self.proxy_request(request, authorization, None).await; // TODO: error handling? match response.parsed().await?.payload { - jsonrpc::Payload::Success { result } => Ok(serde_json::from_str(result.get())?), - jsonrpc::Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)), + jsonrpc::ResponsePayload::Success { result } => Ok(serde_json::from_str(result.get())?), + jsonrpc::ResponsePayload::Error { error } => { + Err(Web3ProxyError::JsonRpcErrorData(error)) + } } } @@ -1028,7 +1030,7 @@ impl Web3ProxyApp { async fn proxy_web3_rpc_requests( self: &Arc, authorization: &Arc, - requests: Vec, + requests: Vec, ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1109,7 +1111,7 @@ impl Web3ProxyApp { /// proxy request with up to 3 tries. async fn proxy_request( self: &Arc, - request: JsonRpcRequest, + request: SingleRequest, authorization: Arc, head_block: Option, ) -> (StatusCode, jsonrpc::SingleResponse, Vec>) { @@ -1430,7 +1432,7 @@ impl Web3ProxyApp { // sometimes we get an error that the transaction is already known by our nodes, // that's not really an error. Return the hash like a successful response would. // TODO: move this to a helper function. probably part of try_send_protected - if let JsonRpcResponseEnum::RpcError{ error_data, ..} = &response { + if let ForwardedResponse::RpcError{ error_data, ..} = &response { if error_data.code == -32000 && (error_data.message == "ALREADY_EXISTS: already known" || error_data.message == "INTERNAL_ERROR: existing tx with same hash") @@ -1466,7 +1468,7 @@ impl Web3ProxyApp { trace!("tx_hash: {:#?}", tx_hash); - response = JsonRpcResponseEnum::from(tx_hash); + response = ForwardedResponse::from(tx_hash); } } } @@ -1477,7 +1479,7 @@ impl Web3ProxyApp { // TODO: use this cache to avoid sending duplicate transactions? // TODO: different salt for ips and transactions? if let Some(ref salt) = self.config.public_recent_ips_salt { - if let JsonRpcResponseEnum::Result { value, .. } = &response { + if let ForwardedResponse::Result { value, .. } = &response { let now = Utc::now().timestamp(); let app = self.clone(); @@ -1671,12 +1673,12 @@ impl Web3ProxyApp { Ok(response_data) => { if !web3_request.cache_jsonrpc_errors() && let Err(err) = response_data { // if we are not supposed to cache jsonrpc errors, - // then we must not convert Provider errors into a JsonRpcResponseEnum + // then we must not convert Provider errors into a ForwardedResponse // return all the errors now. moka will not cache Err results Err(err) } else { - // convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors - let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + // convert jsonrpc errors into ForwardedResponse, but leave the rest as errors + let response_data: ForwardedResponse> = response_data.try_into()?; if response_data.is_null() { // don't ever cache "null" as a success. its too likely to be a problem @@ -1686,7 +1688,7 @@ impl Web3ProxyApp { // TODO: emit a stat Err(Web3ProxyError::JsonRpcResponse(response_data)) } else { - // TODO: response data should maybe be Arc>>, but that's more work + // TODO: response data should maybe be Arc>>, but that's more work Ok(response_data) } } diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index eca58d01..db11e579 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -4,7 +4,7 @@ use super::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{RequestOrMethod, Web3Request}; use crate::jsonrpc; -use crate::response_cache::JsonRpcResponseEnum; +use crate::response_cache::ForwardedResponse; use axum::extract::ws::{CloseFrame, Message}; use deferred_rate_limiter::DeferredRateLimitResult; use ethers::types::U64; @@ -107,7 +107,7 @@ impl Web3ProxyApp { break; } - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + // TODO: make a struct for this? using our SingleForwardedResponse won't work because it needs an id let response_json = json!({ "jsonrpc": "2.0", "method":"eth_subscription", @@ -121,7 +121,7 @@ impl Web3ProxyApp { let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); - // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier + // we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len(); // TODO: do clients support binary messages? @@ -208,7 +208,7 @@ impl Web3ProxyApp { let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); - // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier + // we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len(); subscription_web3_request.add_response(response_bytes); @@ -246,7 +246,7 @@ impl Web3ProxyApp { // TODO: do something with subscription_join_handle? - let response_data = JsonRpcResponseEnum::from(json!(subscription_id)); + let response_data = ForwardedResponse::from(json!(subscription_id)); let response = jsonrpc::ParsedResponse::from_response_data(response_data, web3_request.id()); diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 2ed7578d..fb89bebb 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,6 +1,6 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. use crate::app::Web3ProxyApp; -use crate::jsonrpc::JsonRpcRequest; +use crate::jsonrpc::SingleRequest; use crate::{ errors::{Web3ProxyError, Web3ProxyResult}, rpcs::blockchain::Web3ProxyBlock, @@ -260,12 +260,12 @@ impl CacheMode { /// like `try_new`, but instead of erroring, it will default to caching with the head block /// returns None if this request should not be cached pub async fn new<'a>( - request: &'a mut JsonRpcRequest, + request: &'a mut SingleRequest, head_block: Option<&'a Web3ProxyBlock>, app: Option<&'a Web3ProxyApp>, ) -> Self { match Self::try_new(request, head_block, app).await { - Ok(x) => x, + Ok(x) => return x, Err(Web3ProxyError::NoBlocksKnown) => { warn!( method = %request.method, @@ -294,7 +294,7 @@ impl CacheMode { } pub async fn try_new( - request: &mut JsonRpcRequest, + request: &mut SingleRequest, head_block: Option<&Web3ProxyBlock>, app: Option<&Web3ProxyApp>, ) -> Web3ProxyResult { @@ -519,7 +519,7 @@ mod test { use super::CacheMode; use crate::{ errors::Web3ProxyError, - jsonrpc::{JsonRpcId, JsonRpcRequest}, + jsonrpc::{LooseId, SingleRequest}, rpcs::blockchain::Web3ProxyBlock, }; use ethers::types::{Block, H256}; @@ -539,9 +539,9 @@ mod test { let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap(); - let id = JsonRpcId::Number(9); + let id = LooseId::Number(9); - let mut request = JsonRpcRequest::new(id, method.to_string(), params).unwrap(); + let mut request = SingleRequest::new(id, method.to_string(), params).unwrap(); // TODO: instead of empty, check None? let x = CacheMode::try_new(&mut request, Some(&head_block), None) @@ -574,9 +574,9 @@ mod test { let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap(); - let id = JsonRpcId::Number(99); + let id = LooseId::Number(99); - let mut request = JsonRpcRequest::new(id, method.to_string(), params).unwrap(); + let mut request = SingleRequest::new(id, method.to_string(), params).unwrap(); let x = CacheMode::try_new(&mut request, Some(&head_block), None) .await @@ -611,7 +611,7 @@ mod test { let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap(); - let mut request = JsonRpcRequest::new(99.into(), method.to_string(), params).unwrap(); + let mut request = SingleRequest::new(99.into(), method.to_string(), params).unwrap(); let x = CacheMode::try_new(&mut request, Some(&head_block), None) .await diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 700ca34b..02d729c6 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -1,8 +1,8 @@ //! Utlities for logging errors for admins and displaying errors to users. use crate::frontend::authorization::Authorization; -use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcForwardedResponse}; -use crate::response_cache::JsonRpcResponseEnum; +use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse}; +use crate::response_cache::ForwardedResponse; use crate::rpcs::provider::EthersHttpProvider; use axum::extract::rejection::JsonRejection; use axum::extract::ws::Message; @@ -139,11 +139,11 @@ pub enum Web3ProxyError { /// make it easy to skip caching large results #[error(ignore)] #[display(fmt = "{:?}", _0)] - JsonRpcResponse(JsonRpcResponseEnum>), + JsonRpcResponse(ForwardedResponse>), /// make it easy to skip caching streaming results #[error(ignore)] #[display(fmt = "{:?}", _0)] - StreamResponse(Mutex>), + StreamResponse(Mutex>>>), /// make it easy to skip caching null results NullJsonRpcResult, OriginRequired, @@ -215,7 +215,7 @@ impl Web3ProxyError { /// turn the error into an axum response. /// /// TODO? change to `to_response_parts(self)` - pub fn as_response_parts(&self) -> (StatusCode, JsonRpcResponseEnum>) { + pub fn as_response_parts(&self) -> (StatusCode, ForwardedResponse>) { // TODO: include a unique request id in the data let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::Abi(err) => { @@ -846,7 +846,7 @@ impl Web3ProxyError { unreachable!("stream is pulled out, not used here"); } Self::NullJsonRpcResult => { - return (StatusCode::OK, JsonRpcResponseEnum::NullResult); + return (StatusCode::OK, ForwardedResponse::NullResult); } Self::OriginRequired => { trace!("OriginRequired"); @@ -1231,7 +1231,7 @@ impl Web3ProxyError { }, }; - (code, JsonRpcResponseEnum::from(err)) + (code, ForwardedResponse::from(err)) } #[inline] @@ -1240,7 +1240,7 @@ impl Web3ProxyError { let id = id.unwrap_or_default(); - let response = JsonRpcForwardedResponse::from_response_data(response_data, id); + let response = ParsedResponse::from_response_data(response_data, id); (status_code, Json(response)).into_response() } @@ -1310,7 +1310,7 @@ impl Web3ProxyError { let id = id.unwrap_or_default(); - let err = JsonRpcForwardedResponse::from_response_data(err, id); + let err = ParsedResponse::from_response_data(err, id); let msg = serde_json::to_string(&err).expect("errors should always serialize to json"); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index d697c532..6c9076c5 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -7,7 +7,7 @@ use crate::block_number::CacheMode; use crate::caches::RegisteredUserRateLimitKey; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::globals::{global_db_replica_conn, APP}; -use crate::jsonrpc::{self, JsonRpcId, JsonRpcParams, JsonRpcRequest}; +use crate::jsonrpc::{self, JsonRpcParams, LooseId, SingleRequest}; use crate::kafka::KafkaDebugLogger; use crate::response_cache::JsonRpcQueryCacheKey; use crate::rpcs::blockchain::Web3ProxyBlock; @@ -131,7 +131,7 @@ impl Hash for RpcSecretKey { #[derive(Debug, Default, From, Serialize)] pub enum RequestOrMethod { - Request(JsonRpcRequest), + Request(SingleRequest), /// sometimes we don't have a full request. for example, when we are logging a websocket subscription Method(Cow<'static, str>, usize), #[default] @@ -281,7 +281,7 @@ impl RequestOrMethod { } } - pub fn jsonrpc_request(&self) -> Option<&JsonRpcRequest> { + pub fn jsonrpc_request(&self) -> Option<&SingleRequest> { match self { Self::Request(x) => Some(x), _ => None, @@ -443,10 +443,10 @@ impl Web3Request { let authorization = Arc::new(Authorization::internal().unwrap()); // TODO: we need a real id! increment a counter on the app - let id = JsonRpcId::Number(1); + let id = LooseId::Number(1); // TODO: this seems inefficient - let request = JsonRpcRequest::new(id, method, json!(params)).unwrap(); + let request = SingleRequest::new(id, method, json!(params)).unwrap(); if let Some(app) = APP.get() { Self::new_with_app(app, authorization, max_wait, request.into(), head_block).await diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index a2b76d8d..274f5442 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -4,12 +4,8 @@ use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, Web3Request}; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; -use crate::jsonrpc; -use crate::{ - app::Web3ProxyApp, - errors::Web3ProxyResult, - jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}, -}; +use crate::jsonrpc::{self, ParsedResponse}; +use crate::{app::Web3ProxyApp, errors::Web3ProxyResult, jsonrpc::SingleRequest}; use axum::headers::{Origin, Referer, UserAgent}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, @@ -320,7 +316,7 @@ async fn proxy_web3_socket( async fn websocket_proxy_web3_rpc( app: &Arc, authorization: Arc, - json_request: JsonRpcRequest, + json_request: SingleRequest, response_sender: &mpsc::Sender, subscription_count: &AtomicU64, subscriptions: &AsyncRwLock>, @@ -337,7 +333,7 @@ async fn websocket_proxy_web3_rpc( .await { Ok((handle, response)) => { - if let jsonrpc::Payload::Success { + if let jsonrpc::ResponsePayload::Success { result: ref subscription_id, } = response.payload { @@ -417,7 +413,7 @@ async fn handle_socket_payload( let (authorization, semaphore) = authorization.check_again(app).await?; // TODO: handle batched requests - let (response_id, response) = match serde_json::from_str::(payload) { + let (response_id, response) = match serde_json::from_str::(payload) { Ok(json_request) => { let request_id = json_request.id.clone(); @@ -442,7 +438,7 @@ async fn handle_socket_payload( Err(err) => { let (_, response_data) = err.as_response_parts(); - let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); + let response = ParsedResponse::from_response_data(response_data, response_id); serde_json::to_string(&response).expect("to_string should always work here") } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs deleted file mode 100644 index faa682db..00000000 --- a/web3_proxy/src/jsonrpc.rs +++ /dev/null @@ -1,860 +0,0 @@ -use axum::body::StreamBody; -use axum::response::{IntoResponse, Response as AxumResponse}; -use axum::Json; -use bytes::{Bytes, BytesMut}; -use derive_more::From; -use futures_util::stream::{self, StreamExt}; -use futures_util::TryStreamExt; -use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; -use serde::{Deserialize, Serialize}; -use serde_inline_default::serde_inline_default; -use serde_json::json; -use serde_json::value::{to_raw_value, RawValue}; -use std::borrow::Cow; -use std::fmt; -use std::marker::PhantomData; -use std::sync::{atomic, Arc}; -use std::time::Duration; -use tokio::time::sleep; - -use crate::app::Web3ProxyApp; -use crate::errors::{Web3ProxyError, Web3ProxyResult}; -use crate::frontend::authorization::{Authorization, RequestOrMethod, Web3Request}; -use crate::response_cache::JsonRpcResponseEnum; - -pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static; -pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send; - -/// TODO: borrow values to avoid allocs if possible -#[derive(Debug, Serialize)] -pub struct ParsedResponse> { - pub jsonrpc: String, - pub id: Box, - #[serde(flatten)] - pub payload: Payload, -} - -impl ParsedResponse { - pub fn from_value(value: serde_json::Value, id: Box) -> Self { - let result = serde_json::value::to_raw_value(&value) - .expect("this should not fail") - .into(); - Self::from_result(result, id) - } -} - -impl ParsedResponse> { - pub fn from_response_data(data: JsonRpcResponseEnum>, id: Box) -> Self { - match data { - JsonRpcResponseEnum::NullResult => { - let x: Box = Default::default(); - // TODO: how can we make this generic if this always wants to be a Box?. Do we even want to keep NullResult? - Self::from_result(Arc::from(x), id) - } - JsonRpcResponseEnum::RpcError { error_data, .. } => Self::from_error(error_data, id), - JsonRpcResponseEnum::Result { value, .. } => Self::from_result(value, id), - } - } -} - -impl ParsedResponse { - pub fn from_result(result: T, id: Box) -> Self { - Self { - jsonrpc: "2.0".to_string(), - id, - payload: Payload::Success { result }, - } - } - - pub fn from_error(error: JsonRpcErrorData, id: Box) -> Self { - Self { - jsonrpc: "2.0".to_string(), - id, - payload: Payload::Error { error }, - } - } - - pub fn result(&self) -> Option<&T> { - match &self.payload { - Payload::Success { result } => Some(result), - Payload::Error { .. } => None, - } - } - - pub fn into_result(self) -> Web3ProxyResult { - match self.payload { - Payload::Success { result } => Ok(result), - Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)), - } - } -} - -impl<'de, T> Deserialize<'de> for ParsedResponse -where - T: de::DeserializeOwned, -{ - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct ResponseVisitor(PhantomData); - impl<'de, T> de::Visitor<'de> for ResponseVisitor - where - T: de::DeserializeOwned, - { - type Value = ParsedResponse; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a valid jsonrpc 2.0 response object") - } - - fn visit_map(self, mut map: A) -> Result - where - A: de::MapAccess<'de>, - { - let mut jsonrpc = None; - - // response & error - let mut id = None; - // only response - let mut result = None; - // only error - let mut error = None; - - while let Some(key) = map.next_key()? { - match key { - "jsonrpc" => { - if jsonrpc.is_some() { - return Err(de::Error::duplicate_field("jsonrpc")); - } - - let value = map.next_value()?; - if value != "2.0" { - return Err(de::Error::invalid_value( - de::Unexpected::Str(value), - &"2.0", - )); - } - - jsonrpc = Some(value); - } - "id" => { - if id.is_some() { - return Err(de::Error::duplicate_field("id")); - } - - let value: Box = map.next_value()?; - id = Some(value); - } - "result" => { - if result.is_some() { - return Err(de::Error::duplicate_field("result")); - } - - let value: T = map.next_value()?; - result = Some(value); - } - "error" => { - if error.is_some() { - return Err(de::Error::duplicate_field("Error")); - } - - let value: JsonRpcErrorData = map.next_value()?; - error = Some(value); - } - key => { - return Err(de::Error::unknown_field( - key, - &["jsonrpc", "id", "result", "error"], - )); - } - } - } - - let id = id.unwrap_or_default(); - - // jsonrpc version must be present in all responses - let jsonrpc = jsonrpc - .ok_or_else(|| de::Error::missing_field("jsonrpc"))? - .to_string(); - - let payload = match (result, error) { - (Some(result), None) => Payload::Success { result }, - (None, Some(error)) => Payload::Error { error }, - _ => { - return Err(de::Error::custom( - "response must be either a success or error object", - )) - } - }; - - Ok(ParsedResponse { - jsonrpc, - id, - payload, - }) - } - } - - deserializer.deserialize_map(ResponseVisitor(PhantomData)) - } -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(untagged)] -pub enum Payload { - Success { result: T }, - Error { error: JsonRpcErrorData }, -} - -#[derive(Debug)] -pub struct StreamResponse { - // TODO: phantom T on here? - buffer: Bytes, - response: reqwest::Response, - web3_request: Arc, -} - -impl StreamResponse { - // TODO: error handing - pub async fn read(self) -> Web3ProxyResult> - where - T: de::DeserializeOwned, - { - let mut buffer = BytesMut::with_capacity(self.buffer.len()); - buffer.extend_from_slice(&self.buffer); - buffer.extend_from_slice(&self.response.bytes().await?); - let parsed = serde_json::from_slice(&buffer)?; - Ok(parsed) - } -} - -impl IntoResponse for StreamResponse { - fn into_response(self) -> axum::response::Response { - let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) }) - .chain(self.response.bytes_stream()) - .map_ok(move |x| { - let len = x.len(); - - self.web3_request.add_response(len); - - x - }); - let body = StreamBody::new(stream); - body.into_response() - } -} - -#[derive(Debug)] -pub enum SingleResponse> { - /// TODO: save the size here so we don't have to serialize again - Parsed(ParsedResponse), - Stream(StreamResponse), -} - -impl SingleResponse -where - T: de::DeserializeOwned + Serialize, -{ - // TODO: threshold from configs - // TODO: error handling - // TODO: if a large stream's response's initial chunk "error" then we should buffer it - pub async fn read_if_short( - mut response: reqwest::Response, - nbytes: u64, - web3_request: &Arc, - ) -> Web3ProxyResult> { - Ok(Self::from_bytes(response.bytes().await?)?) - /* - match response.content_length() { - // short - Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?), - // long - Some(_) => Ok(Self::Stream(StreamResponse { - buffer: Bytes::new(), - response, - web3_request: web3_request.clone(), - })), - // unknown length. maybe compressed. maybe streaming. maybe both - None => { - let mut buffer = BytesMut::new(); - while (buffer.len() as u64) < nbytes { - match response.chunk().await? { - Some(chunk) => { - buffer.extend_from_slice(&chunk); - } - None => { - // it was short - return Ok(Self::from_bytes(buffer.freeze())?); - } - } - } - - // we've read nbytes of the response, but there is more to come - let buffer = buffer.freeze(); - Ok(Self::Stream(StreamResponse { - buffer, - response, - web3_request: web3_request.clone(), - })) - } - } - */ - } - - fn from_bytes(buf: Bytes) -> Result { - let val = serde_json::from_slice(&buf)?; - Ok(Self::Parsed(val)) - } - - // TODO: error handling - pub async fn parsed(self) -> Web3ProxyResult> { - match self { - Self::Parsed(resp) => Ok(resp), - Self::Stream(resp) => resp.read().await, - } - } - - pub fn num_bytes(&self) -> usize { - match self { - Self::Parsed(response) => serde_json::to_string(response) - .expect("this should always serialize") - .len(), - Self::Stream(response) => match response.response.content_length() { - Some(len) => len as usize, - None => 0, - }, - } - } - - pub fn set_id(&mut self, id: Box) { - match self { - SingleResponse::Parsed(x) => { - x.id = id; - } - SingleResponse::Stream(..) => { - // stream responses will hopefully always have the right id already because we pass the orignal id all the way from the front to the back - } - } - } -} - -impl From> for SingleResponse { - fn from(response: ParsedResponse) -> Self { - Self::Parsed(response) - } -} - -impl IntoResponse for SingleResponse -where - T: Serialize, -{ - fn into_response(self) -> axum::response::Response { - match self { - Self::Parsed(resp) => Json(resp).into_response(), - Self::Stream(resp) => resp.into_response(), - } - } -} - -#[derive(Debug)] -pub enum Response> { - Single(SingleResponse), - Batch(Vec>), -} - -impl Response> { - pub async fn to_json_string(self) -> Web3ProxyResult { - let x = match self { - Self::Single(resp) => { - // TODO: handle streaming differently? - let parsed = resp.parsed().await?; - - serde_json::to_string(&parsed) - } - Self::Batch(resps) => serde_json::to_string(&resps), - }; - - let x = x.expect("to_string should always work"); - - Ok(x) - } -} - -impl From> for Response { - fn from(response: ParsedResponse) -> Self { - Self::Single(SingleResponse::Parsed(response)) - } -} - -impl IntoResponse for Response -where - T: Serialize, -{ - fn into_response(self) -> axum::response::Response { - match self { - Self::Single(resp) => resp.into_response(), - Self::Batch(resps) => Json(resps).into_response(), - } - } -} - -// TODO: &str here instead of String should save a lot of allocations -// TODO: generic type for params? -#[serde_inline_default] -#[derive(Clone, Deserialize, Serialize)] -pub struct JsonRpcRequest { - pub jsonrpc: String, - /// id could be a stricter type, but many rpcs do things against the spec - /// TODO: this gets cloned into the response object often. would an Arc be better? That has its own overhead and these are short strings - pub id: Box, - pub method: String, - #[serde_inline_default(serde_json::Value::Null)] - pub params: serde_json::Value, -} - -#[derive(From)] -pub enum JsonRpcId { - None, - Number(u64), - String(String), - Raw(Box), -} - -impl JsonRpcId { - pub fn to_raw_value(self) -> Box { - // TODO: is this a good way to do this? we should probably use references - match self { - Self::None => Default::default(), - Self::Number(x) => { - serde_json::from_value(json!(x)).expect("number id should always work") - } - Self::String(x) => serde_json::from_str(&x).expect("string id should always work"), - Self::Raw(x) => x, - } - } -} - -impl JsonRpcRequest { - // TODO: Web3ProxyResult? can this even fail? - pub fn new(id: JsonRpcId, method: String, params: serde_json::Value) -> anyhow::Result { - let x = Self { - jsonrpc: "2.0".to_string(), - id: id.to_raw_value(), - method, - params, - }; - - Ok(x) - } - - pub fn validate_method(&self) -> bool { - self.method - .chars() - .all(|x| x.is_ascii_alphanumeric() || x == '_' || x == '(' || x == ')') - } -} - -impl fmt::Debug for JsonRpcRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - // TODO: how should we include params in this? maybe just the length? - f.debug_struct("JsonRpcRequest") - .field("id", &self.id) - .field("method", &self.method) - .field("params", &self.params) - .finish() - } -} - -/// Requests can come in multiple formats -#[derive(Debug, From, Serialize)] -pub enum JsonRpcRequestEnum { - Batch(Vec), - Single(JsonRpcRequest), -} - -impl JsonRpcRequestEnum { - pub fn first_id(&self) -> Option> { - match self { - Self::Batch(x) => x.first().map(|x| x.id.clone()), - Self::Single(x) => Some(x.id.clone()), - } - } - - /// returns the id of the first invalid result (if any). None is good - pub fn validate(&self) -> Option> { - match self { - Self::Batch(x) => x - .iter() - .find_map(|x| (!x.validate_method()).then_some(x.id.clone())), - Self::Single(x) => { - if x.validate_method() { - None - } else { - Some(x.id.clone()) - } - } - } - } - - /// returns the id of the first invalid result (if any). None is good - pub async fn tarpit_invalid( - &self, - app: &Arc, - authorization: &Arc, - duration: Duration, - ) -> Result<(), AxumResponse> { - let err_id = match self.validate() { - None => return Ok(()), - Some(x) => x, - }; - - let size = serde_json::to_string(&self) - .expect("JsonRpcRequestEnum should always serialize") - .len(); - - // TODO: create a stat so we can penalize - // TODO: what request size - let request = Web3Request::new_with_app( - app, - authorization.clone(), - None, - RequestOrMethod::Method("invalid_method".into(), size), - None, - ) - .await - .unwrap(); - - request - .user_error_response - .store(true, atomic::Ordering::Release); - - let response = Web3ProxyError::BadRequest("request failed validation".into()); - - request.add_response(&response); - - let response = response.into_response_with_id(Some(err_id)); - - // TODO: variable duration depending on the IP - sleep(duration).await; - - let _ = request.try_send_arc_stat(); - - Err(response) - } -} - -impl<'de> Deserialize<'de> for JsonRpcRequestEnum { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - #[serde(field_identifier, rename_all = "lowercase")] - enum Field { - JsonRpc, - Id, - Method, - Params, - } - - struct JsonRpcBatchVisitor; - - impl<'de> Visitor<'de> for JsonRpcBatchVisitor { - type Value = JsonRpcRequestEnum; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("JsonRpcRequestEnum") - } - - fn visit_seq(self, mut seq: V) -> Result - where - V: SeqAccess<'de>, - { - // TODO: what size should we use as the default? - let mut batch: Vec = - Vec::with_capacity(seq.size_hint().unwrap_or(10)); - - while let Ok(Some(s)) = seq.next_element::() { - batch.push(s); - } - - Ok(JsonRpcRequestEnum::Batch(batch)) - } - - fn visit_map(self, mut map: A) -> Result - where - A: MapAccess<'de>, - { - // TODO: i feel like this should be easier - let mut jsonrpc = None; - let mut id = None; - let mut method = None; - let mut params = None; - - while let Some(key) = map.next_key()? { - match key { - Field::JsonRpc => { - // throw away the value - // TODO: should we check that it's 2.0? - // TODO: how do we skip over this value entirely? - jsonrpc = Some(map.next_value()?); - } - Field::Id => { - if id.is_some() { - return Err(de::Error::duplicate_field("id")); - } - id = Some(map.next_value()?); - } - Field::Method => { - if method.is_some() { - return Err(de::Error::duplicate_field("method")); - } - method = Some(map.next_value()?); - } - Field::Params => { - if params.is_some() { - return Err(de::Error::duplicate_field("params")); - } - params = Some(map.next_value()?); - } - } - } - - // some providers don't follow the spec and dont include the jsonrpc key - // i think "2.0" should be a fine default to handle these incompatible clones - let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string()); - // TODO: Errors returned by the try operator get shown in an ugly way - let id = id.ok_or_else(|| de::Error::missing_field("id"))?; - let method = method.ok_or_else(|| de::Error::missing_field("method"))?; - - let single = JsonRpcRequest { - jsonrpc, - id, - method, - params: params.unwrap_or_default(), - }; - - Ok(JsonRpcRequestEnum::Single(single)) - } - } - - let batch_visitor = JsonRpcBatchVisitor {}; - - deserializer.deserialize_any(batch_visitor) - } -} - -// TODO: impl Error on this? -/// All jsonrpc errors use this structure -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct JsonRpcErrorData { - /// The error code - pub code: i64, - /// The error message - pub message: Cow<'static, str>, - /// Additional data - #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, -} - -impl JsonRpcErrorData { - pub fn num_bytes(&self) -> usize { - serde_json::to_string(self) - .expect("should always serialize") - .len() - } - - pub fn is_retryable(&self) -> bool { - // TODO: move stuff from request to here - todo!() - } -} - -impl From<&'static str> for JsonRpcErrorData { - fn from(value: &'static str) -> Self { - Self { - code: -32000, - message: value.into(), - data: None, - } - } -} - -impl From for JsonRpcErrorData { - fn from(value: String) -> Self { - Self { - code: -32000, - message: value.into(), - data: None, - } - } -} - -/// A complete response -/// TODO: better Debug response -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct JsonRpcForwardedResponse { - // TODO: jsonrpc a &str? - pub jsonrpc: &'static str, - pub id: Box, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, -} - -impl JsonRpcRequest { - pub fn num_bytes(&self) -> usize { - // TODO: not sure how to do this without wasting a ton of allocations - serde_json::to_string(self) - .expect("this should always be valid json") - .len() - } -} - -impl JsonRpcForwardedResponse { - pub fn from_anyhow_error(err: anyhow::Error, code: Option, id: Box) -> Self { - let message = format!("{:?}", err); - - Self::from_string(message, code, id) - } - - pub fn from_str(message: &str, code: Option, id: Box) -> Self { - Self::from_string(message.to_string(), code, id) - } - - pub fn from_string(message: String, code: Option, id: Box) -> Self { - // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that - // TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton? - JsonRpcForwardedResponse { - jsonrpc: "2.0", - id, - result: None, - error: Some(JsonRpcErrorData { - code: code.unwrap_or(-32099), - message: message.into(), - // TODO: accept data as an argument - data: None, - }), - } - } - - pub fn from_raw_response(result: Arc, id: Box) -> Self { - JsonRpcForwardedResponse { - jsonrpc: "2.0", - id, - // TODO: since we only use the result here, should that be all we return from try_send_request? - result: Some(result), - error: None, - } - } - - pub fn from_value(result: serde_json::Value, id: Box) -> Self { - let partial_response = to_raw_value(&result).expect("Value to RawValue should always work"); - - // TODO: an Arc is a waste here. change JsonRpcForwardedResponse to take an enum? - let partial_response = partial_response.into(); - - JsonRpcForwardedResponse { - jsonrpc: "2.0", - id, - result: Some(partial_response), - error: None, - } - } - - pub fn from_response_data(data: JsonRpcResponseEnum>, id: Box) -> Self { - match data { - JsonRpcResponseEnum::NullResult => { - let x: Box = Default::default(); - Self::from_raw_response(x.into(), id) - } - JsonRpcResponseEnum::Result { value, .. } => Self::from_raw_response(value, id), - JsonRpcResponseEnum::RpcError { - error_data: value, .. - } => JsonRpcForwardedResponse { - jsonrpc: "2.0", - id, - result: None, - error: Some(value), - }, - } - } -} - -/// JSONRPC Responses can include one or many response objects. -#[derive(Clone, Debug, From, Serialize)] -#[serde(untagged)] -pub enum JsonRpcForwardedResponseEnum { - Single(JsonRpcForwardedResponse), - Batch(Vec), -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn deserialize_response() { - let json = r#"{"jsonrpc":"2.0","id":null,"result":100}"#; - let obj: ParsedResponse = serde_json::from_str(json).unwrap(); - assert!(matches!(obj.payload, Payload::Success { .. })); - } - - #[test] - fn serialize_response() { - let obj = ParsedResponse { - jsonrpc: "2.0".to_string(), - id: Default::default(), - payload: Payload::Success { - result: serde_json::value::RawValue::from_string("100".to_string()).unwrap(), - }, - }; - let json = serde_json::to_string(&obj).unwrap(); - assert_eq!(json, r#"{"jsonrpc":"2.0","id":null,"result":100}"#); - } - - #[test] - fn this_deserialize_single() { - let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#; - - // test deserializing it directly to a single request object - let output: JsonRpcRequest = serde_json::from_str(input).unwrap(); - - assert_eq!(output.id.to_string(), "1"); - assert_eq!(output.method, "eth_blockNumber"); - assert_eq!(output.params.to_string(), "[]"); - - // test deserializing it into an enum - let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap(); - - assert!(matches!(output, JsonRpcRequestEnum::Single(_))); - } - - #[test] - fn this_deserialize_batch() { - let input = r#"[{"jsonrpc":"2.0","method":"eth_getCode","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":27},{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":28},{"jsonrpc":"2.0","method":"eth_getBalance","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":29}]"#; - - // test deserializing it directly to a batch of request objects - let output: Vec = serde_json::from_str(input).unwrap(); - - assert_eq!(output.len(), 3); - - assert_eq!(output[0].id.to_string(), "27"); - assert_eq!(output[0].method, "eth_getCode"); - assert_eq!( - output[0].params.to_string(), - r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"# - ); - - assert_eq!(output[1].id.to_string(), "28"); - assert_eq!(output[2].id.to_string(), "29"); - - // test deserializing it into an enum - let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap(); - - assert!(matches!(output, JsonRpcRequestEnum::Batch(_))); - } -} diff --git a/web3_proxy/src/jsonrpc/error.rs b/web3_proxy/src/jsonrpc/error.rs new file mode 100644 index 00000000..faf3214b --- /dev/null +++ b/web3_proxy/src/jsonrpc/error.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; + +// TODO: impl Error on this? +/// All jsonrpc errors use this structure +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct JsonRpcErrorData { + /// The error code + pub code: i64, + /// The error message + pub message: Cow<'static, str>, + /// Additional data + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +impl JsonRpcErrorData { + pub fn num_bytes(&self) -> usize { + serde_json::to_string(self) + .expect("should always serialize") + .len() + } + + pub fn is_retryable(&self) -> bool { + // TODO: move stuff from request to here + todo!() + } +} + +impl From<&'static str> for JsonRpcErrorData { + fn from(value: &'static str) -> Self { + Self { + code: -32000, + message: value.into(), + data: None, + } + } +} + +impl From for JsonRpcErrorData { + fn from(value: String) -> Self { + Self { + code: -32000, + message: value.into(), + data: None, + } + } +} diff --git a/web3_proxy/src/jsonrpc/id.rs b/web3_proxy/src/jsonrpc/id.rs new file mode 100644 index 00000000..1e0365cf --- /dev/null +++ b/web3_proxy/src/jsonrpc/id.rs @@ -0,0 +1,25 @@ +use derive_more::From; +use serde_json::{json, value::RawValue}; + +/// being strict on id doesn't really help much. just accept anything +#[derive(From)] +pub enum LooseId { + None, + Number(u64), + String(String), + Raw(Box), +} + +impl LooseId { + pub fn to_raw_value(self) -> Box { + // TODO: is this a good way to do this? maybe also have `as_raw_value`? + match self { + Self::None => Default::default(), + Self::Number(x) => { + serde_json::from_value(json!(x)).expect("number id should always work") + } + Self::String(x) => serde_json::from_str(&x).expect("string id should always work"), + Self::Raw(x) => x, + } + } +} diff --git a/web3_proxy/src/jsonrpc/mod.rs b/web3_proxy/src/jsonrpc/mod.rs new file mode 100644 index 00000000..3b794f52 --- /dev/null +++ b/web3_proxy/src/jsonrpc/mod.rs @@ -0,0 +1,85 @@ +pub mod error; +pub mod id; +pub mod request; +pub mod request_builder; +pub mod response; + +use std::fmt; + +pub use self::error::JsonRpcErrorData; +pub use self::id::LooseId; +pub use self::request::{JsonRpcRequestEnum, SingleRequest}; +pub use self::response::{ + ParsedResponse, Response, ResponsePayload, SingleResponse, StreamResponse, +}; + +pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static; +pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send; + +#[cfg(test)] +mod tests { + use super::request::{JsonRpcRequestEnum, SingleRequest}; + use super::response::{ParsedResponse, ResponsePayload}; + + #[test] + fn deserialize_response() { + let json = r#"{"jsonrpc":"2.0","id":null,"result":100}"#; + let obj: ParsedResponse = serde_json::from_str(json).unwrap(); + assert!(matches!(obj.payload, ResponsePayload::Success { .. })); + } + + #[test] + fn serialize_response() { + let obj = ParsedResponse { + jsonrpc: "2.0".to_string(), + id: Default::default(), + payload: ResponsePayload::Success { + result: serde_json::value::RawValue::from_string("100".to_string()).unwrap(), + }, + }; + let json = serde_json::to_string(&obj).unwrap(); + assert_eq!(json, r#"{"jsonrpc":"2.0","id":null,"result":100}"#); + } + + #[test] + fn this_deserialize_single() { + let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#; + + // test deserializing it directly to a single request object + let output: SingleRequest = serde_json::from_str(input).unwrap(); + + assert_eq!(output.id.to_string(), "1"); + assert_eq!(output.method, "eth_blockNumber"); + assert_eq!(output.params.to_string(), "[]"); + + // test deserializing it into an enum + let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap(); + + assert!(matches!(output, JsonRpcRequestEnum::Single(_))); + } + + #[test] + fn this_deserialize_batch() { + let input = r#"[{"jsonrpc":"2.0","method":"eth_getCode","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":27},{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":28},{"jsonrpc":"2.0","method":"eth_getBalance","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":29}]"#; + + // test deserializing it directly to a batch of request objects + let output: Vec = serde_json::from_str(input).unwrap(); + + assert_eq!(output.len(), 3); + + assert_eq!(output[0].id.to_string(), "27"); + assert_eq!(output[0].method, "eth_getCode"); + assert_eq!( + output[0].params.to_string(), + r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"# + ); + + assert_eq!(output[1].id.to_string(), "28"); + assert_eq!(output[2].id.to_string(), "29"); + + // test deserializing it into an enum + let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap(); + + assert!(matches!(output, JsonRpcRequestEnum::Batch(_))); + } +} diff --git a/web3_proxy/src/jsonrpc/request.rs b/web3_proxy/src/jsonrpc/request.rs new file mode 100644 index 00000000..01c5aff5 --- /dev/null +++ b/web3_proxy/src/jsonrpc/request.rs @@ -0,0 +1,248 @@ +use axum::response::Response as AxumResponse; +use derive_more::From; +use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; +use serde::{Deserialize, Serialize}; +use serde_inline_default::serde_inline_default; +use serde_json::value::RawValue; +use std::fmt; +use std::sync::{atomic, Arc}; +use std::time::Duration; +use tokio::time::sleep; + +use crate::app::Web3ProxyApp; +use crate::errors::Web3ProxyError; +use crate::frontend::authorization::{Authorization, RequestOrMethod, Web3Request}; + +use super::LooseId; + +// TODO: &str here instead of String should save a lot of allocations +// TODO: generic type for params? +#[serde_inline_default] +#[derive(Clone, Deserialize, Serialize)] +pub struct SingleRequest { + pub jsonrpc: String, + /// id could be a stricter type, but many rpcs do things against the spec + /// TODO: this gets cloned into the response object often. would an Arc be better? That has its own overhead and these are short strings + pub id: Box, + pub method: String, + #[serde_inline_default(serde_json::Value::Null)] + pub params: serde_json::Value, +} + +impl SingleRequest { + // TODO: Web3ProxyResult? can this even fail? + pub fn new(id: LooseId, method: String, params: serde_json::Value) -> anyhow::Result { + let x = Self { + jsonrpc: "2.0".to_string(), + id: id.to_raw_value(), + method, + params, + }; + + Ok(x) + } + + /// TODO: not sure how to do this without wasting a ton of allocations + pub fn num_bytes(&self) -> usize { + serde_json::to_string(self) + .expect("this should always be valid json") + .len() + } + + pub fn validate_method(&self) -> bool { + self.method + .chars() + .all(|x| x.is_ascii_alphanumeric() || x == '_' || x == '(' || x == ')') + } +} + +impl fmt::Debug for SingleRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + // TODO: how should we include params in this? maybe just the length? + f.debug_struct("JsonRpcRequest") + .field("id", &self.id) + .field("method", &self.method) + .field("params", &self.params) + .finish() + } +} + +/// Requests can come in multiple formats +#[derive(Debug, From, Serialize)] +pub enum JsonRpcRequestEnum { + Batch(Vec), + Single(SingleRequest), +} + +impl JsonRpcRequestEnum { + pub fn first_id(&self) -> Option> { + match self { + Self::Batch(x) => x.first().map(|x| x.id.clone()), + Self::Single(x) => Some(x.id.clone()), + } + } + + /// returns the id of the first invalid result (if any). None is good + pub fn validate(&self) -> Option> { + match self { + Self::Batch(x) => x + .iter() + .find_map(|x| (!x.validate_method()).then_some(x.id.clone())), + Self::Single(x) => { + if x.validate_method() { + None + } else { + Some(x.id.clone()) + } + } + } + } + + /// returns the id of the first invalid result (if any). None is good + pub async fn tarpit_invalid( + &self, + app: &Arc, + authorization: &Arc, + duration: Duration, + ) -> Result<(), AxumResponse> { + let err_id = match self.validate() { + None => return Ok(()), + Some(x) => x, + }; + + let size = serde_json::to_string(&self) + .expect("JsonRpcRequestEnum should always serialize") + .len(); + + // TODO: create a stat so we can penalize + // TODO: what request size + let request = Web3Request::new_with_app( + app, + authorization.clone(), + None, + RequestOrMethod::Method("invalid_method".into(), size), + None, + ) + .await + .unwrap(); + + request + .user_error_response + .store(true, atomic::Ordering::Release); + + let response = Web3ProxyError::BadRequest("request failed validation".into()); + + request.add_response(&response); + + let response = response.into_response_with_id(Some(err_id)); + + // TODO: variable duration depending on the IP + sleep(duration).await; + + let _ = request.try_send_arc_stat(); + + Err(response) + } +} + +impl<'de> Deserialize<'de> for JsonRpcRequestEnum { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Field { + JsonRpc, + Id, + Method, + Params, + } + + struct JsonRpcBatchVisitor; + + impl<'de> Visitor<'de> for JsonRpcBatchVisitor { + type Value = JsonRpcRequestEnum; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("JsonRpcRequestEnum") + } + + fn visit_seq(self, mut seq: V) -> Result + where + V: SeqAccess<'de>, + { + // TODO: what size should we use as the default? + let mut batch: Vec = + Vec::with_capacity(seq.size_hint().unwrap_or(10)); + + while let Ok(Some(s)) = seq.next_element::() { + batch.push(s); + } + + Ok(JsonRpcRequestEnum::Batch(batch)) + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + // TODO: i feel like this should be easier + let mut jsonrpc = None; + let mut id = None; + let mut method = None; + let mut params = None; + + while let Some(key) = map.next_key()? { + match key { + Field::JsonRpc => { + // throw away the value + // TODO: should we check that it's 2.0? + // TODO: how do we skip over this value entirely? + jsonrpc = Some(map.next_value()?); + } + Field::Id => { + if id.is_some() { + return Err(de::Error::duplicate_field("id")); + } + id = Some(map.next_value()?); + } + Field::Method => { + if method.is_some() { + return Err(de::Error::duplicate_field("method")); + } + method = Some(map.next_value()?); + } + Field::Params => { + if params.is_some() { + return Err(de::Error::duplicate_field("params")); + } + params = Some(map.next_value()?); + } + } + } + + // some providers don't follow the spec and dont include the jsonrpc key + // i think "2.0" should be a fine default to handle these incompatible clones + let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string()); + // TODO: Errors returned by the try operator get shown in an ugly way + let id = id.ok_or_else(|| de::Error::missing_field("id"))?; + let method = method.ok_or_else(|| de::Error::missing_field("method"))?; + + let single = SingleRequest { + jsonrpc, + id, + method, + params: params.unwrap_or_default(), + }; + + Ok(JsonRpcRequestEnum::Single(single)) + } + } + + let batch_visitor = JsonRpcBatchVisitor {}; + + deserializer.deserialize_any(batch_visitor) + } +} diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs new file mode 100644 index 00000000..e69de29b diff --git a/web3_proxy/src/jsonrpc/response.rs b/web3_proxy/src/jsonrpc/response.rs new file mode 100644 index 00000000..adb73c8a --- /dev/null +++ b/web3_proxy/src/jsonrpc/response.rs @@ -0,0 +1,395 @@ +use axum::body::StreamBody; +use axum::response::IntoResponse; +use axum::Json; +use bytes::{Bytes, BytesMut}; +use futures_util::stream::{self, StreamExt}; +use futures_util::TryStreamExt; +use serde::{de, Deserialize, Serialize}; +use serde_json::value::RawValue; +use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; + +use crate::errors::{Web3ProxyError, Web3ProxyResult}; +use crate::frontend::authorization::Web3Request; +use crate::response_cache::ForwardedResponse; + +use super::JsonRpcErrorData; + +pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static; +pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send; + +/// TODO: borrow values to avoid allocs if possible +/// TODO: lots of overlap with `SingleForwardedResponse` +#[derive(Debug, Serialize)] +pub struct ParsedResponse> { + pub jsonrpc: String, + pub id: Box, + #[serde(flatten)] + pub payload: ResponsePayload, +} + +impl ParsedResponse { + pub fn from_value(value: serde_json::Value, id: Box) -> Self { + let result = serde_json::value::to_raw_value(&value) + .expect("this should not fail") + .into(); + Self::from_result(result, id) + } +} + +impl ParsedResponse> { + pub fn from_response_data(data: ForwardedResponse>, id: Box) -> Self { + match data { + ForwardedResponse::NullResult => { + let x: Box = Default::default(); + // TODO: how can we make this generic if this always wants to be a Box?. Do we even want to keep NullResult? + Self::from_result(Arc::from(x), id) + } + ForwardedResponse::RpcError { error_data, .. } => Self::from_error(error_data, id), + ForwardedResponse::Result { value, .. } => Self::from_result(value, id), + } + } +} + +impl ParsedResponse { + pub fn from_result(result: T, id: Box) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + payload: ResponsePayload::Success { result }, + } + } + + pub fn from_error(error: JsonRpcErrorData, id: Box) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + payload: ResponsePayload::Error { error }, + } + } + + pub fn result(&self) -> Option<&T> { + match &self.payload { + ResponsePayload::Success { result } => Some(result), + ResponsePayload::Error { .. } => None, + } + } + + pub fn into_result(self) -> Web3ProxyResult { + match self.payload { + ResponsePayload::Success { result } => Ok(result), + ResponsePayload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)), + } + } +} + +impl<'de, T> Deserialize<'de> for ParsedResponse +where + T: de::DeserializeOwned, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct ResponseVisitor(PhantomData); + impl<'de, T> de::Visitor<'de> for ResponseVisitor + where + T: de::DeserializeOwned, + { + type Value = ParsedResponse; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a valid jsonrpc 2.0 response object") + } + + fn visit_map(self, mut map: A) -> Result + where + A: de::MapAccess<'de>, + { + let mut jsonrpc = None; + + // response & error + let mut id = None; + // only response + let mut result = None; + // only error + let mut error = None; + + while let Some(key) = map.next_key()? { + match key { + "jsonrpc" => { + if jsonrpc.is_some() { + return Err(de::Error::duplicate_field("jsonrpc")); + } + + let value = map.next_value()?; + if value != "2.0" { + return Err(de::Error::invalid_value( + de::Unexpected::Str(value), + &"2.0", + )); + } + + jsonrpc = Some(value); + } + "id" => { + if id.is_some() { + return Err(de::Error::duplicate_field("id")); + } + + let value: Box = map.next_value()?; + id = Some(value); + } + "result" => { + if result.is_some() { + return Err(de::Error::duplicate_field("result")); + } + + let value: T = map.next_value()?; + result = Some(value); + } + "error" => { + if error.is_some() { + return Err(de::Error::duplicate_field("Error")); + } + + let value: JsonRpcErrorData = map.next_value()?; + error = Some(value); + } + key => { + return Err(de::Error::unknown_field( + key, + &["jsonrpc", "id", "result", "error"], + )); + } + } + } + + let id = id.unwrap_or_default(); + + // jsonrpc version must be present in all responses + let jsonrpc = jsonrpc + .ok_or_else(|| de::Error::missing_field("jsonrpc"))? + .to_string(); + + let payload = match (result, error) { + (Some(result), None) => ResponsePayload::Success { result }, + (None, Some(error)) => ResponsePayload::Error { error }, + _ => { + return Err(de::Error::custom( + "response must be either a success or error object", + )) + } + }; + + Ok(ParsedResponse { + jsonrpc, + id, + payload, + }) + } + } + + deserializer.deserialize_map(ResponseVisitor(PhantomData)) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +pub enum ResponsePayload { + Success { result: T }, + Error { error: JsonRpcErrorData }, +} + +#[derive(Debug)] +pub struct StreamResponse { + _t: PhantomData, + buffer: Bytes, + response: reqwest::Response, + web3_request: Arc, +} + +impl StreamResponse { + // TODO: error handing + pub async fn read(self) -> Web3ProxyResult> + where + T: de::DeserializeOwned, + { + let mut buffer = BytesMut::with_capacity(self.buffer.len()); + buffer.extend_from_slice(&self.buffer); + buffer.extend_from_slice(&self.response.bytes().await?); + let parsed = serde_json::from_slice(&buffer)?; + Ok(parsed) + } +} + +impl IntoResponse for StreamResponse { + fn into_response(self) -> axum::response::Response { + let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) }) + .chain(self.response.bytes_stream()) + .map_ok(move |x| { + let len = x.len(); + + self.web3_request.add_response(len); + + x + }); + let body = StreamBody::new(stream); + body.into_response() + } +} + +#[derive(Debug)] +pub enum SingleResponse> { + /// TODO: save the size here so we don't have to serialize again + Parsed(ParsedResponse), + Stream(StreamResponse), +} + +impl SingleResponse +where + T: de::DeserializeOwned + Serialize, +{ + // TODO: threshold from configs + // TODO: error handling + // TODO: if a large stream's response's initial chunk "error" then we should buffer it + pub async fn read_if_short( + mut response: reqwest::Response, + nbytes: u64, + web3_request: &Arc, + ) -> Web3ProxyResult> { + Ok(Self::from_bytes(response.bytes().await?)?) + /* + match response.content_length() { + // short + Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?), + // long + Some(_) => Ok(Self::Stream(StreamResponse { + buffer: Bytes::new(), + response, + web3_request: web3_request.clone(), + })), + // unknown length. maybe compressed. maybe streaming. maybe both + None => { + let mut buffer = BytesMut::new(); + while (buffer.len() as u64) < nbytes { + match response.chunk().await? { + Some(chunk) => { + buffer.extend_from_slice(&chunk); + } + None => { + // it was short + return Ok(Self::from_bytes(buffer.freeze())?); + } + } + } + + // we've read nbytes of the response, but there is more to come + let buffer = buffer.freeze(); + Ok(Self::Stream(StreamResponse { + buffer, + response, + web3_request: web3_request.clone(), + })) + } + } + */ + } + + fn from_bytes(buf: Bytes) -> Result { + let val = serde_json::from_slice(&buf)?; + Ok(Self::Parsed(val)) + } + + // TODO: error handling + pub async fn parsed(self) -> Web3ProxyResult> { + match self { + Self::Parsed(resp) => Ok(resp), + Self::Stream(resp) => resp.read().await, + } + } + + pub fn num_bytes(&self) -> usize { + match self { + Self::Parsed(response) => serde_json::to_string(response) + .expect("this should always serialize") + .len(), + Self::Stream(response) => match response.response.content_length() { + Some(len) => len as usize, + None => 0, + }, + } + } + + pub fn set_id(&mut self, id: Box) { + match self { + SingleResponse::Parsed(x) => { + x.id = id; + } + SingleResponse::Stream(..) => { + // stream responses will hopefully always have the right id already because we pass the orignal id all the way from the front to the back + } + } + } +} + +impl From> for SingleResponse { + fn from(response: ParsedResponse) -> Self { + Self::Parsed(response) + } +} + +impl IntoResponse for SingleResponse +where + T: Serialize, +{ + fn into_response(self) -> axum::response::Response { + match self { + Self::Parsed(resp) => Json(resp).into_response(), + Self::Stream(resp) => resp.into_response(), + } + } +} + +#[derive(Debug)] +pub enum Response> { + Single(SingleResponse), + Batch(Vec>), +} + +impl Response> { + pub async fn to_json_string(self) -> Web3ProxyResult { + let x = match self { + Self::Single(resp) => { + // TODO: handle streaming differently? + let parsed = resp.parsed().await?; + + serde_json::to_string(&parsed) + } + Self::Batch(resps) => serde_json::to_string(&resps), + }; + + let x = x.expect("to_string should always work"); + + Ok(x) + } +} + +impl From> for Response { + fn from(response: ParsedResponse) -> Self { + Self::Single(SingleResponse::Parsed(response)) + } +} + +impl IntoResponse for Response +where + T: Serialize, +{ + fn into_response(self) -> axum::response::Response { + match self { + Self::Single(resp) => resp.into_response(), + Self::Batch(resps) => Json(resps).into_response(), + } + } +} diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 3649354d..b25c58f9 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -86,12 +86,12 @@ impl<'a> JsonRpcQueryCacheKey<'a> { } } -// TODO: i think if we change this to Arc>>, we can speed things up -pub type JsonRpcResponseCache = Cache>>; +// TODO: i think if we change this to Arc>>, we can speed things up +pub type JsonRpcResponseCache = Cache>>; /// TODO: we might need one that holds RawValue and one that holds serde_json::Value #[derive(Clone, Debug)] -pub enum JsonRpcResponseEnum { +pub enum ForwardedResponse { NullResult, Result { value: R, @@ -104,7 +104,7 @@ pub enum JsonRpcResponseEnum { } // TODO: impl for other inner result types? -impl JsonRpcResponseEnum { +impl ForwardedResponse { pub fn num_bytes(&self) -> u32 { match self { Self::NullResult => 1, @@ -122,13 +122,13 @@ impl JsonRpcResponseEnum { } } -impl JsonRpcResponseEnum> { +impl ForwardedResponse> { pub fn is_null(&self) -> bool { matches!(self, Self::NullResult | Self::Result { value: None, .. }) } } -impl JsonRpcResponseEnum> { +impl ForwardedResponse> { pub fn is_null(&self) -> bool { match self { Self::NullResult => true, @@ -138,21 +138,21 @@ impl JsonRpcResponseEnum> { } } -impl TryFrom> for JsonRpcResponseEnum> { +impl TryFrom> for ForwardedResponse> { type Error = Web3ProxyError; fn try_from(response: Web3ProxyResult) -> Result { match response { Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload { - jsonrpc::Payload::Success { result } => { + jsonrpc::ResponsePayload::Success { result } => { let num_bytes = result.get().len() as u32; - Ok(JsonRpcResponseEnum::Result { + Ok(ForwardedResponse::Result { value: result, num_bytes, }) } - jsonrpc::Payload::Error { error } => { + jsonrpc::ResponsePayload::Error { error } => { let num_bytes = error.num_bytes() as u32; - Ok(JsonRpcResponseEnum::RpcError { + Ok(ForwardedResponse::RpcError { error_data: error, // TODO: this double serializes num_bytes, @@ -167,7 +167,7 @@ impl TryFrom> for JsonRpcResponseEnum for JsonRpcResponseEnum> { +impl From for ForwardedResponse> { fn from(value: serde_json::Value) -> Self { let value = RawValue::from_string(value.to_string()).unwrap(); @@ -175,7 +175,7 @@ impl From for JsonRpcResponseEnum> { } } -impl From> for JsonRpcResponseEnum> { +impl From> for ForwardedResponse> { fn from(value: Arc) -> Self { let num_bytes = value.get().len(); @@ -185,7 +185,7 @@ impl From> for JsonRpcResponseEnum> { } } -impl From> for JsonRpcResponseEnum> { +impl From> for ForwardedResponse> { fn from(value: Box) -> Self { let num_bytes = value.get().len(); @@ -197,7 +197,7 @@ impl From> for JsonRpcResponseEnum> { } } -impl TryFrom for JsonRpcResponseEnum> { +impl TryFrom for ForwardedResponse> { type Error = Web3ProxyError; fn try_from(value: Web3ProxyError) -> Result { @@ -206,7 +206,7 @@ impl TryFrom for JsonRpcResponseEnum> { Ok(x) => Ok(x.into()), Err(..) => Err(err.into()), }, - Web3ProxyError::NullJsonRpcResult => Ok(JsonRpcResponseEnum::NullResult), + Web3ProxyError::NullJsonRpcResult => Ok(ForwardedResponse::NullResult), Web3ProxyError::JsonRpcResponse(x) => Ok(x), Web3ProxyError::JsonRpcErrorData(err) => Ok(err.into()), err => Err(err), @@ -214,7 +214,7 @@ impl TryFrom for JsonRpcResponseEnum> { } } -impl TryFrom, Web3ProxyError>> for JsonRpcResponseEnum> { +impl TryFrom, Web3ProxyError>> for ForwardedResponse> { type Error = Web3ProxyError; fn try_from(value: Result, Web3ProxyError>) -> Result { @@ -228,7 +228,7 @@ impl TryFrom, Web3ProxyError>> for JsonRpcResponseEnum, Web3ProxyError>> for JsonRpcResponseEnum> { +impl TryFrom, Web3ProxyError>> for ForwardedResponse> { type Error = Web3ProxyError; fn try_from(value: Result, Web3ProxyError>) -> Result { @@ -243,7 +243,7 @@ impl TryFrom, Web3ProxyError>> for JsonRpcResponseEnum From for JsonRpcResponseEnum { +impl From for ForwardedResponse { fn from(value: JsonRpcErrorData) -> Self { // TODO: wrap the error in a complete response? let num_bytes = serde_json::to_string(&value).unwrap().len(); @@ -317,7 +317,7 @@ impl<'a> TryFrom<&'a WsClientError> for JsonRpcErrorData { pub struct JsonRpcResponseWeigher(pub u32); impl JsonRpcResponseWeigher { - pub fn weigh(&self, _key: &K, value: &JsonRpcResponseEnum) -> u32 { + pub fn weigh(&self, _key: &K, value: &ForwardedResponse) -> u32 { let x = value.num_bytes(); if x > self.0 { @@ -331,7 +331,7 @@ impl JsonRpcResponseWeigher { #[cfg(test)] mod tests { - use super::JsonRpcResponseEnum; + use super::ForwardedResponse; use crate::response_cache::JsonRpcResponseWeigher; use moka::future::{Cache, CacheBuilder}; use serde_json::value::RawValue; @@ -344,28 +344,28 @@ mod tests { let weigher = JsonRpcResponseWeigher(max_item_weight); - let small_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { + let small_data: ForwardedResponse> = ForwardedResponse::Result { value: Box::::default().into(), num_bytes: max_item_weight / 2, }; assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2); - let max_sized_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { + let max_sized_data: ForwardedResponse> = ForwardedResponse::Result { value: Box::::default().into(), num_bytes: max_item_weight, }; assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight); - let oversized_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { + let oversized_data: ForwardedResponse> = ForwardedResponse::Result { value: Box::::default().into(), num_bytes: max_item_weight * 2, }; assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX); - let test_cache: Cache>> = + let test_cache: Cache>> = CacheBuilder::new(weight_capacity) .weigher(move |k, v| weigher.weigh(k, v)) .time_to_live(Duration::from_secs(2)) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index c31984eb..a0870eeb 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -490,9 +490,9 @@ impl Web3Rpcs { let parsed = response.parsed().await?; match parsed.payload { - jsonrpc::Payload::Success { result } => Ok(result), + jsonrpc::ResponsePayload::Success { result } => Ok(result), // TODO: confirm this error type is correct - jsonrpc::Payload::Error { error } => Err(error.into()), + jsonrpc::ResponsePayload::Error { error } => Err(error.into()), } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 6e989c09..ad3f88e8 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1338,8 +1338,8 @@ impl Web3Rpc { let response = handle.request().await?; let parsed = response.parsed().await?; match parsed.payload { - jsonrpc::Payload::Success { result } => Ok(result), - jsonrpc::Payload::Error { error } => Err(error.into()), + jsonrpc::ResponsePayload::Success { result } => Ok(result), + jsonrpc::ResponsePayload::Error { error } => Err(error.into()), } } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 7defdf9b..d96c6b75 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -2,7 +2,7 @@ use super::one::Web3Rpc; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, AuthorizationType, Web3Request}; use crate::globals::{global_db_conn, DB_CONN}; -use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcResultData, Payload}; +use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload}; use anyhow::Context; use chrono::Utc; use derive_more::From; @@ -339,7 +339,7 @@ impl OpenRequestHandle { // TODO: counters for errors vs jsonrpc vs success? let response_is_success = match &response { Ok(jsonrpc::SingleResponse::Parsed(x)) => { - matches!(&x.payload, Payload::Success { .. }) + matches!(&x.payload, ResponsePayload::Success { .. }) } Ok(jsonrpc::SingleResponse::Stream(..)) => true, Err(_) => false, @@ -367,8 +367,8 @@ impl OpenRequestHandle { let response_type: ResponseType = match &response { Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload { - Payload::Success { .. } => unreachable!(), - Payload::Error { error } => { + ResponsePayload::Success { .. } => unreachable!(), + ResponsePayload::Error { error } => { trace!(?error, "jsonrpc error data"); if error.message.starts_with("execution reverted") {