split web3_proxy/src/jsonrpc.rs into multiple files and DRY it up

This commit is contained in:
Bryan Stitt 2023-10-11 00:12:20 -07:00
parent 0f92f457b0
commit 57f8c4bd40
17 changed files with 886 additions and 947 deletions

View File

@ -6,11 +6,11 @@ use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, Web3Request};
use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA};
use crate::jsonrpc::{
self, JsonRpcErrorData, JsonRpcId, JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum,
JsonRpcResultData, SingleResponse,
self, JsonRpcErrorData, JsonRpcParams, JsonRpcRequestEnum, JsonRpcResultData, LooseId,
SingleRequest, SingleResponse,
};
use crate::relational_db::{connect_db, migrate_db};
use crate::response_cache::{JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher};
use crate::response_cache::{ForwardedResponse, JsonRpcResponseCache, JsonRpcResponseWeigher};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::RankedRpcs;
use crate::rpcs::many::Web3Rpcs;
@ -983,14 +983,16 @@ impl Web3ProxyApp {
authorization: Arc<Authorization>,
) -> Web3ProxyResult<R> {
// TODO: proper ids
let request = JsonRpcRequest::new(JsonRpcId::Number(1), method.to_string(), json!(params))?;
let request = SingleRequest::new(LooseId::Number(1), method.to_string(), json!(params))?;
let (_, response, _) = self.proxy_request(request, authorization, None).await;
// TODO: error handling?
match response.parsed().await?.payload {
jsonrpc::Payload::Success { result } => Ok(serde_json::from_str(result.get())?),
jsonrpc::Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)),
jsonrpc::ResponsePayload::Success { result } => Ok(serde_json::from_str(result.get())?),
jsonrpc::ResponsePayload::Error { error } => {
Err(Web3ProxyError::JsonRpcErrorData(error))
}
}
}
@ -1028,7 +1030,7 @@ impl Web3ProxyApp {
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
requests: Vec<SingleRequest>,
) -> Web3ProxyResult<(Vec<jsonrpc::ParsedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -1109,7 +1111,7 @@ impl Web3ProxyApp {
/// proxy request with up to 3 tries.
async fn proxy_request(
self: &Arc<Self>,
request: JsonRpcRequest,
request: SingleRequest,
authorization: Arc<Authorization>,
head_block: Option<Web3ProxyBlock>,
) -> (StatusCode, jsonrpc::SingleResponse, Vec<Arc<Web3Rpc>>) {
@ -1430,7 +1432,7 @@ impl Web3ProxyApp {
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function. probably part of try_send_protected
if let JsonRpcResponseEnum::RpcError{ error_data, ..} = &response {
if let ForwardedResponse::RpcError{ error_data, ..} = &response {
if error_data.code == -32000
&& (error_data.message == "ALREADY_EXISTS: already known"
|| error_data.message == "INTERNAL_ERROR: existing tx with same hash")
@ -1466,7 +1468,7 @@ impl Web3ProxyApp {
trace!("tx_hash: {:#?}", tx_hash);
response = JsonRpcResponseEnum::from(tx_hash);
response = ForwardedResponse::from(tx_hash);
}
}
}
@ -1477,7 +1479,7 @@ impl Web3ProxyApp {
// TODO: use this cache to avoid sending duplicate transactions?
// TODO: different salt for ips and transactions?
if let Some(ref salt) = self.config.public_recent_ips_salt {
if let JsonRpcResponseEnum::Result { value, .. } = &response {
if let ForwardedResponse::Result { value, .. } = &response {
let now = Utc::now().timestamp();
let app = self.clone();
@ -1671,12 +1673,12 @@ impl Web3ProxyApp {
Ok(response_data) => {
if !web3_request.cache_jsonrpc_errors() && let Err(err) = response_data {
// if we are not supposed to cache jsonrpc errors,
// then we must not convert Provider errors into a JsonRpcResponseEnum
// then we must not convert Provider errors into a ForwardedResponse
// return all the errors now. moka will not cache Err results
Err(err)
} else {
// convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
// convert jsonrpc errors into ForwardedResponse, but leave the rest as errors
let response_data: ForwardedResponse<Arc<RawValue>> = response_data.try_into()?;
if response_data.is_null() {
// don't ever cache "null" as a success. its too likely to be a problem
@ -1686,7 +1688,7 @@ impl Web3ProxyApp {
// TODO: emit a stat
Err(Web3ProxyError::JsonRpcResponse(response_data))
} else {
// TODO: response data should maybe be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
// TODO: response data should maybe be Arc<ForwardedResponse<Box<RawValue>>>, but that's more work
Ok(response_data)
}
}

View File

@ -4,7 +4,7 @@ use super::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{RequestOrMethod, Web3Request};
use crate::jsonrpc;
use crate::response_cache::JsonRpcResponseEnum;
use crate::response_cache::ForwardedResponse;
use axum::extract::ws::{CloseFrame, Message};
use deferred_rate_limiter::DeferredRateLimitResult;
use ethers::types::U64;
@ -107,7 +107,7 @@ impl Web3ProxyApp {
break;
}
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
// TODO: make a struct for this? using our SingleForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
@ -121,7 +121,7 @@ impl Web3ProxyApp {
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
// we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
@ -208,7 +208,7 @@ impl Web3ProxyApp {
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
// we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
subscription_web3_request.add_response(response_bytes);
@ -246,7 +246,7 @@ impl Web3ProxyApp {
// TODO: do something with subscription_join_handle?
let response_data = JsonRpcResponseEnum::from(json!(subscription_id));
let response_data = ForwardedResponse::from(json!(subscription_id));
let response =
jsonrpc::ParsedResponse::from_response_data(response_data, web3_request.id());

View File

@ -1,6 +1,6 @@
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
use crate::app::Web3ProxyApp;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::SingleRequest;
use crate::{
errors::{Web3ProxyError, Web3ProxyResult},
rpcs::blockchain::Web3ProxyBlock,
@ -260,12 +260,12 @@ impl CacheMode {
/// like `try_new`, but instead of erroring, it will default to caching with the head block
/// returns None if this request should not be cached
pub async fn new<'a>(
request: &'a mut JsonRpcRequest,
request: &'a mut SingleRequest,
head_block: Option<&'a Web3ProxyBlock>,
app: Option<&'a Web3ProxyApp>,
) -> Self {
match Self::try_new(request, head_block, app).await {
Ok(x) => x,
Ok(x) => return x,
Err(Web3ProxyError::NoBlocksKnown) => {
warn!(
method = %request.method,
@ -294,7 +294,7 @@ impl CacheMode {
}
pub async fn try_new(
request: &mut JsonRpcRequest,
request: &mut SingleRequest,
head_block: Option<&Web3ProxyBlock>,
app: Option<&Web3ProxyApp>,
) -> Web3ProxyResult<Self> {
@ -519,7 +519,7 @@ mod test {
use super::CacheMode;
use crate::{
errors::Web3ProxyError,
jsonrpc::{JsonRpcId, JsonRpcRequest},
jsonrpc::{LooseId, SingleRequest},
rpcs::blockchain::Web3ProxyBlock,
};
use ethers::types::{Block, H256};
@ -539,9 +539,9 @@ mod test {
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let id = JsonRpcId::Number(9);
let id = LooseId::Number(9);
let mut request = JsonRpcRequest::new(id, method.to_string(), params).unwrap();
let mut request = SingleRequest::new(id, method.to_string(), params).unwrap();
// TODO: instead of empty, check None?
let x = CacheMode::try_new(&mut request, Some(&head_block), None)
@ -574,9 +574,9 @@ mod test {
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let id = JsonRpcId::Number(99);
let id = LooseId::Number(99);
let mut request = JsonRpcRequest::new(id, method.to_string(), params).unwrap();
let mut request = SingleRequest::new(id, method.to_string(), params).unwrap();
let x = CacheMode::try_new(&mut request, Some(&head_block), None)
.await
@ -611,7 +611,7 @@ mod test {
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let mut request = JsonRpcRequest::new(99.into(), method.to_string(), params).unwrap();
let mut request = SingleRequest::new(99.into(), method.to_string(), params).unwrap();
let x = CacheMode::try_new(&mut request, Some(&head_block), None)
.await

View File

@ -1,8 +1,8 @@
//! Utlities for logging errors for admins and displaying errors to users.
use crate::frontend::authorization::Authorization;
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcForwardedResponse};
use crate::response_cache::JsonRpcResponseEnum;
use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse};
use crate::response_cache::ForwardedResponse;
use crate::rpcs::provider::EthersHttpProvider;
use axum::extract::rejection::JsonRejection;
use axum::extract::ws::Message;
@ -139,11 +139,11 @@ pub enum Web3ProxyError {
/// make it easy to skip caching large results
#[error(ignore)]
#[display(fmt = "{:?}", _0)]
JsonRpcResponse(JsonRpcResponseEnum<Arc<RawValue>>),
JsonRpcResponse(ForwardedResponse<Arc<RawValue>>),
/// make it easy to skip caching streaming results
#[error(ignore)]
#[display(fmt = "{:?}", _0)]
StreamResponse(Mutex<Option<jsonrpc::StreamResponse>>),
StreamResponse(Mutex<Option<jsonrpc::StreamResponse<Arc<RawValue>>>>),
/// make it easy to skip caching null results
NullJsonRpcResult,
OriginRequired,
@ -215,7 +215,7 @@ impl Web3ProxyError {
/// turn the error into an axum response.
/// <https://www.jsonrpc.org/specification#error_object>
/// TODO? change to `to_response_parts(self)`
pub fn as_response_parts(&self) -> (StatusCode, JsonRpcResponseEnum<Arc<RawValue>>) {
pub fn as_response_parts(&self) -> (StatusCode, ForwardedResponse<Arc<RawValue>>) {
// TODO: include a unique request id in the data
let (code, err): (StatusCode, JsonRpcErrorData) = match self {
Self::Abi(err) => {
@ -846,7 +846,7 @@ impl Web3ProxyError {
unreachable!("stream is pulled out, not used here");
}
Self::NullJsonRpcResult => {
return (StatusCode::OK, JsonRpcResponseEnum::NullResult);
return (StatusCode::OK, ForwardedResponse::NullResult);
}
Self::OriginRequired => {
trace!("OriginRequired");
@ -1231,7 +1231,7 @@ impl Web3ProxyError {
},
};
(code, JsonRpcResponseEnum::from(err))
(code, ForwardedResponse::from(err))
}
#[inline]
@ -1240,7 +1240,7 @@ impl Web3ProxyError {
let id = id.unwrap_or_default();
let response = JsonRpcForwardedResponse::from_response_data(response_data, id);
let response = ParsedResponse::from_response_data(response_data, id);
(status_code, Json(response)).into_response()
}
@ -1310,7 +1310,7 @@ impl Web3ProxyError {
let id = id.unwrap_or_default();
let err = JsonRpcForwardedResponse::from_response_data(err, id);
let err = ParsedResponse::from_response_data(err, id);
let msg = serde_json::to_string(&err).expect("errors should always serialize to json");

View File

@ -7,7 +7,7 @@ use crate::block_number::CacheMode;
use crate::caches::RegisteredUserRateLimitKey;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::globals::{global_db_replica_conn, APP};
use crate::jsonrpc::{self, JsonRpcId, JsonRpcParams, JsonRpcRequest};
use crate::jsonrpc::{self, JsonRpcParams, LooseId, SingleRequest};
use crate::kafka::KafkaDebugLogger;
use crate::response_cache::JsonRpcQueryCacheKey;
use crate::rpcs::blockchain::Web3ProxyBlock;
@ -131,7 +131,7 @@ impl Hash for RpcSecretKey {
#[derive(Debug, Default, From, Serialize)]
pub enum RequestOrMethod {
Request(JsonRpcRequest),
Request(SingleRequest),
/// sometimes we don't have a full request. for example, when we are logging a websocket subscription
Method(Cow<'static, str>, usize),
#[default]
@ -281,7 +281,7 @@ impl RequestOrMethod {
}
}
pub fn jsonrpc_request(&self) -> Option<&JsonRpcRequest> {
pub fn jsonrpc_request(&self) -> Option<&SingleRequest> {
match self {
Self::Request(x) => Some(x),
_ => None,
@ -443,10 +443,10 @@ impl Web3Request {
let authorization = Arc::new(Authorization::internal().unwrap());
// TODO: we need a real id! increment a counter on the app
let id = JsonRpcId::Number(1);
let id = LooseId::Number(1);
// TODO: this seems inefficient
let request = JsonRpcRequest::new(id, method, json!(params)).unwrap();
let request = SingleRequest::new(id, method, json!(params)).unwrap();
if let Some(app) = APP.get() {
Self::new_with_app(app, authorization, max_wait, request.into(), head_block).await

View File

@ -4,12 +4,8 @@
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, Web3Request};
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::jsonrpc;
use crate::{
app::Web3ProxyApp,
errors::Web3ProxyResult,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest},
};
use crate::jsonrpc::{self, ParsedResponse};
use crate::{app::Web3ProxyApp, errors::Web3ProxyResult, jsonrpc::SingleRequest};
use axum::headers::{Origin, Referer, UserAgent};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
@ -320,7 +316,7 @@ async fn proxy_web3_socket(
async fn websocket_proxy_web3_rpc(
app: &Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
json_request: JsonRpcRequest,
json_request: SingleRequest,
response_sender: &mpsc::Sender<Message>,
subscription_count: &AtomicU64,
subscriptions: &AsyncRwLock<HashMap<U64, AbortHandle>>,
@ -337,7 +333,7 @@ async fn websocket_proxy_web3_rpc(
.await
{
Ok((handle, response)) => {
if let jsonrpc::Payload::Success {
if let jsonrpc::ResponsePayload::Success {
result: ref subscription_id,
} = response.payload
{
@ -417,7 +413,7 @@ async fn handle_socket_payload(
let (authorization, semaphore) = authorization.check_again(app).await?;
// TODO: handle batched requests
let (response_id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
let (response_id, response) = match serde_json::from_str::<SingleRequest>(payload) {
Ok(json_request) => {
let request_id = json_request.id.clone();
@ -442,7 +438,7 @@ async fn handle_socket_payload(
Err(err) => {
let (_, response_data) = err.as_response_parts();
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
let response = ParsedResponse::from_response_data(response_data, response_id);
serde_json::to_string(&response).expect("to_string should always work here")
}

View File

@ -1,860 +0,0 @@
use axum::body::StreamBody;
use axum::response::{IntoResponse, Response as AxumResponse};
use axum::Json;
use bytes::{Bytes, BytesMut};
use derive_more::From;
use futures_util::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_inline_default::serde_inline_default;
use serde_json::json;
use serde_json::value::{to_raw_value, RawValue};
use std::borrow::Cow;
use std::fmt;
use std::marker::PhantomData;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::time::sleep;
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestOrMethod, Web3Request};
use crate::response_cache::JsonRpcResponseEnum;
pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;
/// TODO: borrow values to avoid allocs if possible
#[derive(Debug, Serialize)]
pub struct ParsedResponse<T = Arc<RawValue>> {
pub jsonrpc: String,
pub id: Box<RawValue>,
#[serde(flatten)]
pub payload: Payload<T>,
}
impl ParsedResponse {
pub fn from_value(value: serde_json::Value, id: Box<RawValue>) -> Self {
let result = serde_json::value::to_raw_value(&value)
.expect("this should not fail")
.into();
Self::from_result(result, id)
}
}
impl ParsedResponse<Arc<RawValue>> {
pub fn from_response_data(data: JsonRpcResponseEnum<Arc<RawValue>>, id: Box<RawValue>) -> Self {
match data {
JsonRpcResponseEnum::NullResult => {
let x: Box<RawValue> = Default::default();
// TODO: how can we make this generic if this always wants to be a Box<RawValue>?. Do we even want to keep NullResult?
Self::from_result(Arc::from(x), id)
}
JsonRpcResponseEnum::RpcError { error_data, .. } => Self::from_error(error_data, id),
JsonRpcResponseEnum::Result { value, .. } => Self::from_result(value, id),
}
}
}
impl<T> ParsedResponse<T> {
pub fn from_result(result: T, id: Box<RawValue>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
payload: Payload::Success { result },
}
}
pub fn from_error(error: JsonRpcErrorData, id: Box<RawValue>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
payload: Payload::Error { error },
}
}
pub fn result(&self) -> Option<&T> {
match &self.payload {
Payload::Success { result } => Some(result),
Payload::Error { .. } => None,
}
}
pub fn into_result(self) -> Web3ProxyResult<T> {
match self.payload {
Payload::Success { result } => Ok(result),
Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)),
}
}
}
impl<'de, T> Deserialize<'de> for ParsedResponse<T>
where
T: de::DeserializeOwned,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ResponseVisitor<T>(PhantomData<T>);
impl<'de, T> de::Visitor<'de> for ResponseVisitor<T>
where
T: de::DeserializeOwned,
{
type Value = ParsedResponse<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid jsonrpc 2.0 response object")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
let mut jsonrpc = None;
// response & error
let mut id = None;
// only response
let mut result = None;
// only error
let mut error = None;
while let Some(key) = map.next_key()? {
match key {
"jsonrpc" => {
if jsonrpc.is_some() {
return Err(de::Error::duplicate_field("jsonrpc"));
}
let value = map.next_value()?;
if value != "2.0" {
return Err(de::Error::invalid_value(
de::Unexpected::Str(value),
&"2.0",
));
}
jsonrpc = Some(value);
}
"id" => {
if id.is_some() {
return Err(de::Error::duplicate_field("id"));
}
let value: Box<RawValue> = map.next_value()?;
id = Some(value);
}
"result" => {
if result.is_some() {
return Err(de::Error::duplicate_field("result"));
}
let value: T = map.next_value()?;
result = Some(value);
}
"error" => {
if error.is_some() {
return Err(de::Error::duplicate_field("Error"));
}
let value: JsonRpcErrorData = map.next_value()?;
error = Some(value);
}
key => {
return Err(de::Error::unknown_field(
key,
&["jsonrpc", "id", "result", "error"],
));
}
}
}
let id = id.unwrap_or_default();
// jsonrpc version must be present in all responses
let jsonrpc = jsonrpc
.ok_or_else(|| de::Error::missing_field("jsonrpc"))?
.to_string();
let payload = match (result, error) {
(Some(result), None) => Payload::Success { result },
(None, Some(error)) => Payload::Error { error },
_ => {
return Err(de::Error::custom(
"response must be either a success or error object",
))
}
};
Ok(ParsedResponse {
jsonrpc,
id,
payload,
})
}
}
deserializer.deserialize_map(ResponseVisitor(PhantomData))
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum Payload<T> {
Success { result: T },
Error { error: JsonRpcErrorData },
}
#[derive(Debug)]
pub struct StreamResponse {
// TODO: phantom T on here?
buffer: Bytes,
response: reqwest::Response,
web3_request: Arc<Web3Request>,
}
impl StreamResponse {
// TODO: error handing
pub async fn read<T>(self) -> Web3ProxyResult<ParsedResponse<T>>
where
T: de::DeserializeOwned,
{
let mut buffer = BytesMut::with_capacity(self.buffer.len());
buffer.extend_from_slice(&self.buffer);
buffer.extend_from_slice(&self.response.bytes().await?);
let parsed = serde_json::from_slice(&buffer)?;
Ok(parsed)
}
}
impl IntoResponse for StreamResponse {
fn into_response(self) -> axum::response::Response {
let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) })
.chain(self.response.bytes_stream())
.map_ok(move |x| {
let len = x.len();
self.web3_request.add_response(len);
x
});
let body = StreamBody::new(stream);
body.into_response()
}
}
#[derive(Debug)]
pub enum SingleResponse<T = Arc<RawValue>> {
/// TODO: save the size here so we don't have to serialize again
Parsed(ParsedResponse<T>),
Stream(StreamResponse),
}
impl<T> SingleResponse<T>
where
T: de::DeserializeOwned + Serialize,
{
// TODO: threshold from configs
// TODO: error handling
// TODO: if a large stream's response's initial chunk "error" then we should buffer it
pub async fn read_if_short(
mut response: reqwest::Response,
nbytes: u64,
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<SingleResponse<T>> {
Ok(Self::from_bytes(response.bytes().await?)?)
/*
match response.content_length() {
// short
Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?),
// long
Some(_) => Ok(Self::Stream(StreamResponse {
buffer: Bytes::new(),
response,
web3_request: web3_request.clone(),
})),
// unknown length. maybe compressed. maybe streaming. maybe both
None => {
let mut buffer = BytesMut::new();
while (buffer.len() as u64) < nbytes {
match response.chunk().await? {
Some(chunk) => {
buffer.extend_from_slice(&chunk);
}
None => {
// it was short
return Ok(Self::from_bytes(buffer.freeze())?);
}
}
}
// we've read nbytes of the response, but there is more to come
let buffer = buffer.freeze();
Ok(Self::Stream(StreamResponse {
buffer,
response,
web3_request: web3_request.clone(),
}))
}
}
*/
}
fn from_bytes(buf: Bytes) -> Result<Self, serde_json::Error> {
let val = serde_json::from_slice(&buf)?;
Ok(Self::Parsed(val))
}
// TODO: error handling
pub async fn parsed(self) -> Web3ProxyResult<ParsedResponse<T>> {
match self {
Self::Parsed(resp) => Ok(resp),
Self::Stream(resp) => resp.read().await,
}
}
pub fn num_bytes(&self) -> usize {
match self {
Self::Parsed(response) => serde_json::to_string(response)
.expect("this should always serialize")
.len(),
Self::Stream(response) => match response.response.content_length() {
Some(len) => len as usize,
None => 0,
},
}
}
pub fn set_id(&mut self, id: Box<RawValue>) {
match self {
SingleResponse::Parsed(x) => {
x.id = id;
}
SingleResponse::Stream(..) => {
// stream responses will hopefully always have the right id already because we pass the orignal id all the way from the front to the back
}
}
}
}
impl<T> From<ParsedResponse<T>> for SingleResponse<T> {
fn from(response: ParsedResponse<T>) -> Self {
Self::Parsed(response)
}
}
impl<T> IntoResponse for SingleResponse<T>
where
T: Serialize,
{
fn into_response(self) -> axum::response::Response {
match self {
Self::Parsed(resp) => Json(resp).into_response(),
Self::Stream(resp) => resp.into_response(),
}
}
}
#[derive(Debug)]
pub enum Response<T = Arc<RawValue>> {
Single(SingleResponse<T>),
Batch(Vec<ParsedResponse<T>>),
}
impl Response<Arc<RawValue>> {
pub async fn to_json_string(self) -> Web3ProxyResult<String> {
let x = match self {
Self::Single(resp) => {
// TODO: handle streaming differently?
let parsed = resp.parsed().await?;
serde_json::to_string(&parsed)
}
Self::Batch(resps) => serde_json::to_string(&resps),
};
let x = x.expect("to_string should always work");
Ok(x)
}
}
impl<T> From<ParsedResponse<T>> for Response<T> {
fn from(response: ParsedResponse<T>) -> Self {
Self::Single(SingleResponse::Parsed(response))
}
}
impl<T> IntoResponse for Response<T>
where
T: Serialize,
{
fn into_response(self) -> axum::response::Response {
match self {
Self::Single(resp) => resp.into_response(),
Self::Batch(resps) => Json(resps).into_response(),
}
}
}
// TODO: &str here instead of String should save a lot of allocations
// TODO: generic type for params?
#[serde_inline_default]
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
/// id could be a stricter type, but many rpcs do things against the spec
/// TODO: this gets cloned into the response object often. would an Arc be better? That has its own overhead and these are short strings
pub id: Box<RawValue>,
pub method: String,
#[serde_inline_default(serde_json::Value::Null)]
pub params: serde_json::Value,
}
#[derive(From)]
pub enum JsonRpcId {
None,
Number(u64),
String(String),
Raw(Box<RawValue>),
}
impl JsonRpcId {
pub fn to_raw_value(self) -> Box<RawValue> {
// TODO: is this a good way to do this? we should probably use references
match self {
Self::None => Default::default(),
Self::Number(x) => {
serde_json::from_value(json!(x)).expect("number id should always work")
}
Self::String(x) => serde_json::from_str(&x).expect("string id should always work"),
Self::Raw(x) => x,
}
}
}
impl JsonRpcRequest {
// TODO: Web3ProxyResult? can this even fail?
pub fn new(id: JsonRpcId, method: String, params: serde_json::Value) -> anyhow::Result<Self> {
let x = Self {
jsonrpc: "2.0".to_string(),
id: id.to_raw_value(),
method,
params,
};
Ok(x)
}
pub fn validate_method(&self) -> bool {
self.method
.chars()
.all(|x| x.is_ascii_alphanumeric() || x == '_' || x == '(' || x == ')')
}
}
impl fmt::Debug for JsonRpcRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: how should we include params in this? maybe just the length?
f.debug_struct("JsonRpcRequest")
.field("id", &self.id)
.field("method", &self.method)
.field("params", &self.params)
.finish()
}
}
/// Requests can come in multiple formats
#[derive(Debug, From, Serialize)]
pub enum JsonRpcRequestEnum {
Batch(Vec<JsonRpcRequest>),
Single(JsonRpcRequest),
}
impl JsonRpcRequestEnum {
pub fn first_id(&self) -> Option<Box<RawValue>> {
match self {
Self::Batch(x) => x.first().map(|x| x.id.clone()),
Self::Single(x) => Some(x.id.clone()),
}
}
/// returns the id of the first invalid result (if any). None is good
pub fn validate(&self) -> Option<Box<RawValue>> {
match self {
Self::Batch(x) => x
.iter()
.find_map(|x| (!x.validate_method()).then_some(x.id.clone())),
Self::Single(x) => {
if x.validate_method() {
None
} else {
Some(x.id.clone())
}
}
}
}
/// returns the id of the first invalid result (if any). None is good
pub async fn tarpit_invalid(
&self,
app: &Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
duration: Duration,
) -> Result<(), AxumResponse> {
let err_id = match self.validate() {
None => return Ok(()),
Some(x) => x,
};
let size = serde_json::to_string(&self)
.expect("JsonRpcRequestEnum should always serialize")
.len();
// TODO: create a stat so we can penalize
// TODO: what request size
let request = Web3Request::new_with_app(
app,
authorization.clone(),
None,
RequestOrMethod::Method("invalid_method".into(), size),
None,
)
.await
.unwrap();
request
.user_error_response
.store(true, atomic::Ordering::Release);
let response = Web3ProxyError::BadRequest("request failed validation".into());
request.add_response(&response);
let response = response.into_response_with_id(Some(err_id));
// TODO: variable duration depending on the IP
sleep(duration).await;
let _ = request.try_send_arc_stat();
Err(response)
}
}
impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Field {
JsonRpc,
Id,
Method,
Params,
}
struct JsonRpcBatchVisitor;
impl<'de> Visitor<'de> for JsonRpcBatchVisitor {
type Value = JsonRpcRequestEnum;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("JsonRpcRequestEnum")
}
fn visit_seq<V>(self, mut seq: V) -> Result<JsonRpcRequestEnum, V::Error>
where
V: SeqAccess<'de>,
{
// TODO: what size should we use as the default?
let mut batch: Vec<JsonRpcRequest> =
Vec::with_capacity(seq.size_hint().unwrap_or(10));
while let Ok(Some(s)) = seq.next_element::<JsonRpcRequest>() {
batch.push(s);
}
Ok(JsonRpcRequestEnum::Batch(batch))
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
// TODO: i feel like this should be easier
let mut jsonrpc = None;
let mut id = None;
let mut method = None;
let mut params = None;
while let Some(key) = map.next_key()? {
match key {
Field::JsonRpc => {
// throw away the value
// TODO: should we check that it's 2.0?
// TODO: how do we skip over this value entirely?
jsonrpc = Some(map.next_value()?);
}
Field::Id => {
if id.is_some() {
return Err(de::Error::duplicate_field("id"));
}
id = Some(map.next_value()?);
}
Field::Method => {
if method.is_some() {
return Err(de::Error::duplicate_field("method"));
}
method = Some(map.next_value()?);
}
Field::Params => {
if params.is_some() {
return Err(de::Error::duplicate_field("params"));
}
params = Some(map.next_value()?);
}
}
}
// some providers don't follow the spec and dont include the jsonrpc key
// i think "2.0" should be a fine default to handle these incompatible clones
let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string());
// TODO: Errors returned by the try operator get shown in an ugly way
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
let single = JsonRpcRequest {
jsonrpc,
id,
method,
params: params.unwrap_or_default(),
};
Ok(JsonRpcRequestEnum::Single(single))
}
}
let batch_visitor = JsonRpcBatchVisitor {};
deserializer.deserialize_any(batch_visitor)
}
}
// TODO: impl Error on this?
/// All jsonrpc errors use this structure
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct JsonRpcErrorData {
/// The error code
pub code: i64,
/// The error message
pub message: Cow<'static, str>,
/// Additional data
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
}
impl JsonRpcErrorData {
pub fn num_bytes(&self) -> usize {
serde_json::to_string(self)
.expect("should always serialize")
.len()
}
pub fn is_retryable(&self) -> bool {
// TODO: move stuff from request to here
todo!()
}
}
impl From<&'static str> for JsonRpcErrorData {
fn from(value: &'static str) -> Self {
Self {
code: -32000,
message: value.into(),
data: None,
}
}
}
impl From<String> for JsonRpcErrorData {
fn from(value: String) -> Self {
Self {
code: -32000,
message: value.into(),
data: None,
}
}
}
/// A complete response
/// TODO: better Debug response
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse {
// TODO: jsonrpc a &str?
pub jsonrpc: &'static str,
pub id: Box<RawValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Arc<RawValue>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcErrorData>,
}
impl JsonRpcRequest {
pub fn num_bytes(&self) -> usize {
// TODO: not sure how to do this without wasting a ton of allocations
serde_json::to_string(self)
.expect("this should always be valid json")
.len()
}
}
impl JsonRpcForwardedResponse {
pub fn from_anyhow_error(err: anyhow::Error, code: Option<i64>, id: Box<RawValue>) -> Self {
let message = format!("{:?}", err);
Self::from_string(message, code, id)
}
pub fn from_str(message: &str, code: Option<i64>, id: Box<RawValue>) -> Self {
Self::from_string(message.to_string(), code, id)
}
pub fn from_string(message: String, code: Option<i64>, id: Box<RawValue>) -> Self {
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
// TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton?
JsonRpcForwardedResponse {
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcErrorData {
code: code.unwrap_or(-32099),
message: message.into(),
// TODO: accept data as an argument
data: None,
}),
}
}
pub fn from_raw_response(result: Arc<RawValue>, id: Box<RawValue>) -> Self {
JsonRpcForwardedResponse {
jsonrpc: "2.0",
id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
result: Some(result),
error: None,
}
}
pub fn from_value(result: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response = to_raw_value(&result).expect("Value to RawValue should always work");
// TODO: an Arc is a waste here. change JsonRpcForwardedResponse to take an enum?
let partial_response = partial_response.into();
JsonRpcForwardedResponse {
jsonrpc: "2.0",
id,
result: Some(partial_response),
error: None,
}
}
pub fn from_response_data(data: JsonRpcResponseEnum<Arc<RawValue>>, id: Box<RawValue>) -> Self {
match data {
JsonRpcResponseEnum::NullResult => {
let x: Box<RawValue> = Default::default();
Self::from_raw_response(x.into(), id)
}
JsonRpcResponseEnum::Result { value, .. } => Self::from_raw_response(value, id),
JsonRpcResponseEnum::RpcError {
error_data: value, ..
} => JsonRpcForwardedResponse {
jsonrpc: "2.0",
id,
result: None,
error: Some(value),
},
}
}
}
/// JSONRPC Responses can include one or many response objects.
#[derive(Clone, Debug, From, Serialize)]
#[serde(untagged)]
pub enum JsonRpcForwardedResponseEnum {
Single(JsonRpcForwardedResponse),
Batch(Vec<JsonRpcForwardedResponse>),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deserialize_response() {
let json = r#"{"jsonrpc":"2.0","id":null,"result":100}"#;
let obj: ParsedResponse = serde_json::from_str(json).unwrap();
assert!(matches!(obj.payload, Payload::Success { .. }));
}
#[test]
fn serialize_response() {
let obj = ParsedResponse {
jsonrpc: "2.0".to_string(),
id: Default::default(),
payload: Payload::Success {
result: serde_json::value::RawValue::from_string("100".to_string()).unwrap(),
},
};
let json = serde_json::to_string(&obj).unwrap();
assert_eq!(json, r#"{"jsonrpc":"2.0","id":null,"result":100}"#);
}
#[test]
fn this_deserialize_single() {
let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#;
// test deserializing it directly to a single request object
let output: JsonRpcRequest = serde_json::from_str(input).unwrap();
assert_eq!(output.id.to_string(), "1");
assert_eq!(output.method, "eth_blockNumber");
assert_eq!(output.params.to_string(), "[]");
// test deserializing it into an enum
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
assert!(matches!(output, JsonRpcRequestEnum::Single(_)));
}
#[test]
fn this_deserialize_batch() {
let input = r#"[{"jsonrpc":"2.0","method":"eth_getCode","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":27},{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":28},{"jsonrpc":"2.0","method":"eth_getBalance","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":29}]"#;
// test deserializing it directly to a batch of request objects
let output: Vec<JsonRpcRequest> = serde_json::from_str(input).unwrap();
assert_eq!(output.len(), 3);
assert_eq!(output[0].id.to_string(), "27");
assert_eq!(output[0].method, "eth_getCode");
assert_eq!(
output[0].params.to_string(),
r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"#
);
assert_eq!(output[1].id.to_string(), "28");
assert_eq!(output[2].id.to_string(), "29");
// test deserializing it into an enum
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
assert!(matches!(output, JsonRpcRequestEnum::Batch(_)));
}
}

View File

@ -0,0 +1,48 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
// TODO: impl Error on this?
/// All jsonrpc errors use this structure
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct JsonRpcErrorData {
/// The error code
pub code: i64,
/// The error message
pub message: Cow<'static, str>,
/// Additional data
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
}
impl JsonRpcErrorData {
pub fn num_bytes(&self) -> usize {
serde_json::to_string(self)
.expect("should always serialize")
.len()
}
pub fn is_retryable(&self) -> bool {
// TODO: move stuff from request to here
todo!()
}
}
impl From<&'static str> for JsonRpcErrorData {
fn from(value: &'static str) -> Self {
Self {
code: -32000,
message: value.into(),
data: None,
}
}
}
impl From<String> for JsonRpcErrorData {
fn from(value: String) -> Self {
Self {
code: -32000,
message: value.into(),
data: None,
}
}
}

View File

@ -0,0 +1,25 @@
use derive_more::From;
use serde_json::{json, value::RawValue};
/// being strict on id doesn't really help much. just accept anything
#[derive(From)]
pub enum LooseId {
None,
Number(u64),
String(String),
Raw(Box<RawValue>),
}
impl LooseId {
pub fn to_raw_value(self) -> Box<RawValue> {
// TODO: is this a good way to do this? maybe also have `as_raw_value`?
match self {
Self::None => Default::default(),
Self::Number(x) => {
serde_json::from_value(json!(x)).expect("number id should always work")
}
Self::String(x) => serde_json::from_str(&x).expect("string id should always work"),
Self::Raw(x) => x,
}
}
}

View File

@ -0,0 +1,85 @@
pub mod error;
pub mod id;
pub mod request;
pub mod request_builder;
pub mod response;
use std::fmt;
pub use self::error::JsonRpcErrorData;
pub use self::id::LooseId;
pub use self::request::{JsonRpcRequestEnum, SingleRequest};
pub use self::response::{
ParsedResponse, Response, ResponsePayload, SingleResponse, StreamResponse,
};
pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;
#[cfg(test)]
mod tests {
use super::request::{JsonRpcRequestEnum, SingleRequest};
use super::response::{ParsedResponse, ResponsePayload};
#[test]
fn deserialize_response() {
let json = r#"{"jsonrpc":"2.0","id":null,"result":100}"#;
let obj: ParsedResponse = serde_json::from_str(json).unwrap();
assert!(matches!(obj.payload, ResponsePayload::Success { .. }));
}
#[test]
fn serialize_response() {
let obj = ParsedResponse {
jsonrpc: "2.0".to_string(),
id: Default::default(),
payload: ResponsePayload::Success {
result: serde_json::value::RawValue::from_string("100".to_string()).unwrap(),
},
};
let json = serde_json::to_string(&obj).unwrap();
assert_eq!(json, r#"{"jsonrpc":"2.0","id":null,"result":100}"#);
}
#[test]
fn this_deserialize_single() {
let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#;
// test deserializing it directly to a single request object
let output: SingleRequest = serde_json::from_str(input).unwrap();
assert_eq!(output.id.to_string(), "1");
assert_eq!(output.method, "eth_blockNumber");
assert_eq!(output.params.to_string(), "[]");
// test deserializing it into an enum
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
assert!(matches!(output, JsonRpcRequestEnum::Single(_)));
}
#[test]
fn this_deserialize_batch() {
let input = r#"[{"jsonrpc":"2.0","method":"eth_getCode","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":27},{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":28},{"jsonrpc":"2.0","method":"eth_getBalance","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":29}]"#;
// test deserializing it directly to a batch of request objects
let output: Vec<SingleRequest> = serde_json::from_str(input).unwrap();
assert_eq!(output.len(), 3);
assert_eq!(output[0].id.to_string(), "27");
assert_eq!(output[0].method, "eth_getCode");
assert_eq!(
output[0].params.to_string(),
r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"#
);
assert_eq!(output[1].id.to_string(), "28");
assert_eq!(output[2].id.to_string(), "29");
// test deserializing it into an enum
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
assert!(matches!(output, JsonRpcRequestEnum::Batch(_)));
}
}

View File

@ -0,0 +1,248 @@
use axum::response::Response as AxumResponse;
use derive_more::From;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_inline_default::serde_inline_default;
use serde_json::value::RawValue;
use std::fmt;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::time::sleep;
use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyError;
use crate::frontend::authorization::{Authorization, RequestOrMethod, Web3Request};
use super::LooseId;
// TODO: &str here instead of String should save a lot of allocations
// TODO: generic type for params?
#[serde_inline_default]
#[derive(Clone, Deserialize, Serialize)]
pub struct SingleRequest {
pub jsonrpc: String,
/// id could be a stricter type, but many rpcs do things against the spec
/// TODO: this gets cloned into the response object often. would an Arc be better? That has its own overhead and these are short strings
pub id: Box<RawValue>,
pub method: String,
#[serde_inline_default(serde_json::Value::Null)]
pub params: serde_json::Value,
}
impl SingleRequest {
// TODO: Web3ProxyResult? can this even fail?
pub fn new(id: LooseId, method: String, params: serde_json::Value) -> anyhow::Result<Self> {
let x = Self {
jsonrpc: "2.0".to_string(),
id: id.to_raw_value(),
method,
params,
};
Ok(x)
}
/// TODO: not sure how to do this without wasting a ton of allocations
pub fn num_bytes(&self) -> usize {
serde_json::to_string(self)
.expect("this should always be valid json")
.len()
}
pub fn validate_method(&self) -> bool {
self.method
.chars()
.all(|x| x.is_ascii_alphanumeric() || x == '_' || x == '(' || x == ')')
}
}
impl fmt::Debug for SingleRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: how should we include params in this? maybe just the length?
f.debug_struct("JsonRpcRequest")
.field("id", &self.id)
.field("method", &self.method)
.field("params", &self.params)
.finish()
}
}
/// Requests can come in multiple formats
#[derive(Debug, From, Serialize)]
pub enum JsonRpcRequestEnum {
Batch(Vec<SingleRequest>),
Single(SingleRequest),
}
impl JsonRpcRequestEnum {
pub fn first_id(&self) -> Option<Box<RawValue>> {
match self {
Self::Batch(x) => x.first().map(|x| x.id.clone()),
Self::Single(x) => Some(x.id.clone()),
}
}
/// returns the id of the first invalid result (if any). None is good
pub fn validate(&self) -> Option<Box<RawValue>> {
match self {
Self::Batch(x) => x
.iter()
.find_map(|x| (!x.validate_method()).then_some(x.id.clone())),
Self::Single(x) => {
if x.validate_method() {
None
} else {
Some(x.id.clone())
}
}
}
}
/// returns the id of the first invalid result (if any). None is good
pub async fn tarpit_invalid(
&self,
app: &Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
duration: Duration,
) -> Result<(), AxumResponse> {
let err_id = match self.validate() {
None => return Ok(()),
Some(x) => x,
};
let size = serde_json::to_string(&self)
.expect("JsonRpcRequestEnum should always serialize")
.len();
// TODO: create a stat so we can penalize
// TODO: what request size
let request = Web3Request::new_with_app(
app,
authorization.clone(),
None,
RequestOrMethod::Method("invalid_method".into(), size),
None,
)
.await
.unwrap();
request
.user_error_response
.store(true, atomic::Ordering::Release);
let response = Web3ProxyError::BadRequest("request failed validation".into());
request.add_response(&response);
let response = response.into_response_with_id(Some(err_id));
// TODO: variable duration depending on the IP
sleep(duration).await;
let _ = request.try_send_arc_stat();
Err(response)
}
}
impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Field {
JsonRpc,
Id,
Method,
Params,
}
struct JsonRpcBatchVisitor;
impl<'de> Visitor<'de> for JsonRpcBatchVisitor {
type Value = JsonRpcRequestEnum;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("JsonRpcRequestEnum")
}
fn visit_seq<V>(self, mut seq: V) -> Result<JsonRpcRequestEnum, V::Error>
where
V: SeqAccess<'de>,
{
// TODO: what size should we use as the default?
let mut batch: Vec<SingleRequest> =
Vec::with_capacity(seq.size_hint().unwrap_or(10));
while let Ok(Some(s)) = seq.next_element::<SingleRequest>() {
batch.push(s);
}
Ok(JsonRpcRequestEnum::Batch(batch))
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
// TODO: i feel like this should be easier
let mut jsonrpc = None;
let mut id = None;
let mut method = None;
let mut params = None;
while let Some(key) = map.next_key()? {
match key {
Field::JsonRpc => {
// throw away the value
// TODO: should we check that it's 2.0?
// TODO: how do we skip over this value entirely?
jsonrpc = Some(map.next_value()?);
}
Field::Id => {
if id.is_some() {
return Err(de::Error::duplicate_field("id"));
}
id = Some(map.next_value()?);
}
Field::Method => {
if method.is_some() {
return Err(de::Error::duplicate_field("method"));
}
method = Some(map.next_value()?);
}
Field::Params => {
if params.is_some() {
return Err(de::Error::duplicate_field("params"));
}
params = Some(map.next_value()?);
}
}
}
// some providers don't follow the spec and dont include the jsonrpc key
// i think "2.0" should be a fine default to handle these incompatible clones
let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string());
// TODO: Errors returned by the try operator get shown in an ugly way
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
let single = SingleRequest {
jsonrpc,
id,
method,
params: params.unwrap_or_default(),
};
Ok(JsonRpcRequestEnum::Single(single))
}
}
let batch_visitor = JsonRpcBatchVisitor {};
deserializer.deserialize_any(batch_visitor)
}
}

View File

@ -0,0 +1,395 @@
use axum::body::StreamBody;
use axum::response::IntoResponse;
use axum::Json;
use bytes::{Bytes, BytesMut};
use futures_util::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use serde::{de, Deserialize, Serialize};
use serde_json::value::RawValue;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::Web3Request;
use crate::response_cache::ForwardedResponse;
use super::JsonRpcErrorData;
pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;
/// TODO: borrow values to avoid allocs if possible
/// TODO: lots of overlap with `SingleForwardedResponse`
#[derive(Debug, Serialize)]
pub struct ParsedResponse<T = Arc<RawValue>> {
pub jsonrpc: String,
pub id: Box<RawValue>,
#[serde(flatten)]
pub payload: ResponsePayload<T>,
}
impl ParsedResponse {
pub fn from_value(value: serde_json::Value, id: Box<RawValue>) -> Self {
let result = serde_json::value::to_raw_value(&value)
.expect("this should not fail")
.into();
Self::from_result(result, id)
}
}
impl ParsedResponse<Arc<RawValue>> {
pub fn from_response_data(data: ForwardedResponse<Arc<RawValue>>, id: Box<RawValue>) -> Self {
match data {
ForwardedResponse::NullResult => {
let x: Box<RawValue> = Default::default();
// TODO: how can we make this generic if this always wants to be a Box<RawValue>?. Do we even want to keep NullResult?
Self::from_result(Arc::from(x), id)
}
ForwardedResponse::RpcError { error_data, .. } => Self::from_error(error_data, id),
ForwardedResponse::Result { value, .. } => Self::from_result(value, id),
}
}
}
impl<T> ParsedResponse<T> {
pub fn from_result(result: T, id: Box<RawValue>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
payload: ResponsePayload::Success { result },
}
}
pub fn from_error(error: JsonRpcErrorData, id: Box<RawValue>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
payload: ResponsePayload::Error { error },
}
}
pub fn result(&self) -> Option<&T> {
match &self.payload {
ResponsePayload::Success { result } => Some(result),
ResponsePayload::Error { .. } => None,
}
}
pub fn into_result(self) -> Web3ProxyResult<T> {
match self.payload {
ResponsePayload::Success { result } => Ok(result),
ResponsePayload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)),
}
}
}
impl<'de, T> Deserialize<'de> for ParsedResponse<T>
where
T: de::DeserializeOwned,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ResponseVisitor<T>(PhantomData<T>);
impl<'de, T> de::Visitor<'de> for ResponseVisitor<T>
where
T: de::DeserializeOwned,
{
type Value = ParsedResponse<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid jsonrpc 2.0 response object")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
let mut jsonrpc = None;
// response & error
let mut id = None;
// only response
let mut result = None;
// only error
let mut error = None;
while let Some(key) = map.next_key()? {
match key {
"jsonrpc" => {
if jsonrpc.is_some() {
return Err(de::Error::duplicate_field("jsonrpc"));
}
let value = map.next_value()?;
if value != "2.0" {
return Err(de::Error::invalid_value(
de::Unexpected::Str(value),
&"2.0",
));
}
jsonrpc = Some(value);
}
"id" => {
if id.is_some() {
return Err(de::Error::duplicate_field("id"));
}
let value: Box<RawValue> = map.next_value()?;
id = Some(value);
}
"result" => {
if result.is_some() {
return Err(de::Error::duplicate_field("result"));
}
let value: T = map.next_value()?;
result = Some(value);
}
"error" => {
if error.is_some() {
return Err(de::Error::duplicate_field("Error"));
}
let value: JsonRpcErrorData = map.next_value()?;
error = Some(value);
}
key => {
return Err(de::Error::unknown_field(
key,
&["jsonrpc", "id", "result", "error"],
));
}
}
}
let id = id.unwrap_or_default();
// jsonrpc version must be present in all responses
let jsonrpc = jsonrpc
.ok_or_else(|| de::Error::missing_field("jsonrpc"))?
.to_string();
let payload = match (result, error) {
(Some(result), None) => ResponsePayload::Success { result },
(None, Some(error)) => ResponsePayload::Error { error },
_ => {
return Err(de::Error::custom(
"response must be either a success or error object",
))
}
};
Ok(ParsedResponse {
jsonrpc,
id,
payload,
})
}
}
deserializer.deserialize_map(ResponseVisitor(PhantomData))
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum ResponsePayload<T> {
Success { result: T },
Error { error: JsonRpcErrorData },
}
#[derive(Debug)]
pub struct StreamResponse<T> {
_t: PhantomData<T>,
buffer: Bytes,
response: reqwest::Response,
web3_request: Arc<Web3Request>,
}
impl<T> StreamResponse<T> {
// TODO: error handing
pub async fn read(self) -> Web3ProxyResult<ParsedResponse<T>>
where
T: de::DeserializeOwned,
{
let mut buffer = BytesMut::with_capacity(self.buffer.len());
buffer.extend_from_slice(&self.buffer);
buffer.extend_from_slice(&self.response.bytes().await?);
let parsed = serde_json::from_slice(&buffer)?;
Ok(parsed)
}
}
impl<T> IntoResponse for StreamResponse<T> {
fn into_response(self) -> axum::response::Response {
let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) })
.chain(self.response.bytes_stream())
.map_ok(move |x| {
let len = x.len();
self.web3_request.add_response(len);
x
});
let body = StreamBody::new(stream);
body.into_response()
}
}
#[derive(Debug)]
pub enum SingleResponse<T = Arc<RawValue>> {
/// TODO: save the size here so we don't have to serialize again
Parsed(ParsedResponse<T>),
Stream(StreamResponse<T>),
}
impl<T> SingleResponse<T>
where
T: de::DeserializeOwned + Serialize,
{
// TODO: threshold from configs
// TODO: error handling
// TODO: if a large stream's response's initial chunk "error" then we should buffer it
pub async fn read_if_short(
mut response: reqwest::Response,
nbytes: u64,
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<SingleResponse<T>> {
Ok(Self::from_bytes(response.bytes().await?)?)
/*
match response.content_length() {
// short
Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?),
// long
Some(_) => Ok(Self::Stream(StreamResponse {
buffer: Bytes::new(),
response,
web3_request: web3_request.clone(),
})),
// unknown length. maybe compressed. maybe streaming. maybe both
None => {
let mut buffer = BytesMut::new();
while (buffer.len() as u64) < nbytes {
match response.chunk().await? {
Some(chunk) => {
buffer.extend_from_slice(&chunk);
}
None => {
// it was short
return Ok(Self::from_bytes(buffer.freeze())?);
}
}
}
// we've read nbytes of the response, but there is more to come
let buffer = buffer.freeze();
Ok(Self::Stream(StreamResponse {
buffer,
response,
web3_request: web3_request.clone(),
}))
}
}
*/
}
fn from_bytes(buf: Bytes) -> Result<Self, serde_json::Error> {
let val = serde_json::from_slice(&buf)?;
Ok(Self::Parsed(val))
}
// TODO: error handling
pub async fn parsed(self) -> Web3ProxyResult<ParsedResponse<T>> {
match self {
Self::Parsed(resp) => Ok(resp),
Self::Stream(resp) => resp.read().await,
}
}
pub fn num_bytes(&self) -> usize {
match self {
Self::Parsed(response) => serde_json::to_string(response)
.expect("this should always serialize")
.len(),
Self::Stream(response) => match response.response.content_length() {
Some(len) => len as usize,
None => 0,
},
}
}
pub fn set_id(&mut self, id: Box<RawValue>) {
match self {
SingleResponse::Parsed(x) => {
x.id = id;
}
SingleResponse::Stream(..) => {
// stream responses will hopefully always have the right id already because we pass the orignal id all the way from the front to the back
}
}
}
}
impl<T> From<ParsedResponse<T>> for SingleResponse<T> {
fn from(response: ParsedResponse<T>) -> Self {
Self::Parsed(response)
}
}
impl<T> IntoResponse for SingleResponse<T>
where
T: Serialize,
{
fn into_response(self) -> axum::response::Response {
match self {
Self::Parsed(resp) => Json(resp).into_response(),
Self::Stream(resp) => resp.into_response(),
}
}
}
#[derive(Debug)]
pub enum Response<T = Arc<RawValue>> {
Single(SingleResponse<T>),
Batch(Vec<ParsedResponse<T>>),
}
impl Response<Arc<RawValue>> {
pub async fn to_json_string(self) -> Web3ProxyResult<String> {
let x = match self {
Self::Single(resp) => {
// TODO: handle streaming differently?
let parsed = resp.parsed().await?;
serde_json::to_string(&parsed)
}
Self::Batch(resps) => serde_json::to_string(&resps),
};
let x = x.expect("to_string should always work");
Ok(x)
}
}
impl<T> From<ParsedResponse<T>> for Response<T> {
fn from(response: ParsedResponse<T>) -> Self {
Self::Single(SingleResponse::Parsed(response))
}
}
impl<T> IntoResponse for Response<T>
where
T: Serialize,
{
fn into_response(self) -> axum::response::Response {
match self {
Self::Single(resp) => resp.into_response(),
Self::Batch(resps) => Json(resps).into_response(),
}
}
}

View File

@ -86,12 +86,12 @@ impl<'a> JsonRpcQueryCacheKey<'a> {
}
}
// TODO: i think if we change this to Arc<JsonRpcResponseEnum<Box<RawValue>>>, we can speed things up
pub type JsonRpcResponseCache = Cache<u64, JsonRpcResponseEnum<Arc<RawValue>>>;
// TODO: i think if we change this to Arc<ForwardedResponse<Box<RawValue>>>, we can speed things up
pub type JsonRpcResponseCache = Cache<u64, ForwardedResponse<Arc<RawValue>>>;
/// TODO: we might need one that holds RawValue and one that holds serde_json::Value
#[derive(Clone, Debug)]
pub enum JsonRpcResponseEnum<R> {
pub enum ForwardedResponse<R> {
NullResult,
Result {
value: R,
@ -104,7 +104,7 @@ pub enum JsonRpcResponseEnum<R> {
}
// TODO: impl for other inner result types?
impl<R> JsonRpcResponseEnum<R> {
impl<R> ForwardedResponse<R> {
pub fn num_bytes(&self) -> u32 {
match self {
Self::NullResult => 1,
@ -122,13 +122,13 @@ impl<R> JsonRpcResponseEnum<R> {
}
}
impl<R> JsonRpcResponseEnum<Option<R>> {
impl<R> ForwardedResponse<Option<R>> {
pub fn is_null(&self) -> bool {
matches!(self, Self::NullResult | Self::Result { value: None, .. })
}
}
impl JsonRpcResponseEnum<Arc<RawValue>> {
impl ForwardedResponse<Arc<RawValue>> {
pub fn is_null(&self) -> bool {
match self {
Self::NullResult => true,
@ -138,21 +138,21 @@ impl JsonRpcResponseEnum<Arc<RawValue>> {
}
}
impl TryFrom<Web3ProxyResult<jsonrpc::SingleResponse>> for JsonRpcResponseEnum<Arc<RawValue>> {
impl TryFrom<Web3ProxyResult<jsonrpc::SingleResponse>> for ForwardedResponse<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(response: Web3ProxyResult<jsonrpc::SingleResponse>) -> Result<Self, Self::Error> {
match response {
Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload {
jsonrpc::Payload::Success { result } => {
jsonrpc::ResponsePayload::Success { result } => {
let num_bytes = result.get().len() as u32;
Ok(JsonRpcResponseEnum::Result {
Ok(ForwardedResponse::Result {
value: result,
num_bytes,
})
}
jsonrpc::Payload::Error { error } => {
jsonrpc::ResponsePayload::Error { error } => {
let num_bytes = error.num_bytes() as u32;
Ok(JsonRpcResponseEnum::RpcError {
Ok(ForwardedResponse::RpcError {
error_data: error,
// TODO: this double serializes
num_bytes,
@ -167,7 +167,7 @@ impl TryFrom<Web3ProxyResult<jsonrpc::SingleResponse>> for JsonRpcResponseEnum<A
}
}
impl From<serde_json::Value> for JsonRpcResponseEnum<Arc<RawValue>> {
impl From<serde_json::Value> for ForwardedResponse<Arc<RawValue>> {
fn from(value: serde_json::Value) -> Self {
let value = RawValue::from_string(value.to_string()).unwrap();
@ -175,7 +175,7 @@ impl From<serde_json::Value> for JsonRpcResponseEnum<Arc<RawValue>> {
}
}
impl From<Arc<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
impl From<Arc<RawValue>> for ForwardedResponse<Arc<RawValue>> {
fn from(value: Arc<RawValue>) -> Self {
let num_bytes = value.get().len();
@ -185,7 +185,7 @@ impl From<Arc<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
}
}
impl From<Box<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
impl From<Box<RawValue>> for ForwardedResponse<Arc<RawValue>> {
fn from(value: Box<RawValue>) -> Self {
let num_bytes = value.get().len();
@ -197,7 +197,7 @@ impl From<Box<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
}
}
impl TryFrom<Web3ProxyError> for JsonRpcResponseEnum<Arc<RawValue>> {
impl TryFrom<Web3ProxyError> for ForwardedResponse<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Web3ProxyError) -> Result<Self, Self::Error> {
@ -206,7 +206,7 @@ impl TryFrom<Web3ProxyError> for JsonRpcResponseEnum<Arc<RawValue>> {
Ok(x) => Ok(x.into()),
Err(..) => Err(err.into()),
},
Web3ProxyError::NullJsonRpcResult => Ok(JsonRpcResponseEnum::NullResult),
Web3ProxyError::NullJsonRpcResult => Ok(ForwardedResponse::NullResult),
Web3ProxyError::JsonRpcResponse(x) => Ok(x),
Web3ProxyError::JsonRpcErrorData(err) => Ok(err.into()),
err => Err(err),
@ -214,7 +214,7 @@ impl TryFrom<Web3ProxyError> for JsonRpcResponseEnum<Arc<RawValue>> {
}
}
impl TryFrom<Result<Arc<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Arc<RawValue>> {
impl TryFrom<Result<Arc<RawValue>, Web3ProxyError>> for ForwardedResponse<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Result<Arc<RawValue>, Web3ProxyError>) -> Result<Self, Self::Error> {
@ -228,7 +228,7 @@ impl TryFrom<Result<Arc<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Arc<
}
}
}
impl TryFrom<Result<Box<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Arc<RawValue>> {
impl TryFrom<Result<Box<RawValue>, Web3ProxyError>> for ForwardedResponse<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Result<Box<RawValue>, Web3ProxyError>) -> Result<Self, Self::Error> {
@ -243,7 +243,7 @@ impl TryFrom<Result<Box<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Arc<
}
}
impl<R> From<JsonRpcErrorData> for JsonRpcResponseEnum<R> {
impl<R> From<JsonRpcErrorData> for ForwardedResponse<R> {
fn from(value: JsonRpcErrorData) -> Self {
// TODO: wrap the error in a complete response?
let num_bytes = serde_json::to_string(&value).unwrap().len();
@ -317,7 +317,7 @@ impl<'a> TryFrom<&'a WsClientError> for JsonRpcErrorData {
pub struct JsonRpcResponseWeigher(pub u32);
impl JsonRpcResponseWeigher {
pub fn weigh<K, R>(&self, _key: &K, value: &JsonRpcResponseEnum<R>) -> u32 {
pub fn weigh<K, R>(&self, _key: &K, value: &ForwardedResponse<R>) -> u32 {
let x = value.num_bytes();
if x > self.0 {
@ -331,7 +331,7 @@ impl JsonRpcResponseWeigher {
#[cfg(test)]
mod tests {
use super::JsonRpcResponseEnum;
use super::ForwardedResponse;
use crate::response_cache::JsonRpcResponseWeigher;
use moka::future::{Cache, CacheBuilder};
use serde_json::value::RawValue;
@ -344,28 +344,28 @@ mod tests {
let weigher = JsonRpcResponseWeigher(max_item_weight);
let small_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
let small_data: ForwardedResponse<Arc<RawValue>> = ForwardedResponse::Result {
value: Box::<RawValue>::default().into(),
num_bytes: max_item_weight / 2,
};
assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2);
let max_sized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
let max_sized_data: ForwardedResponse<Arc<RawValue>> = ForwardedResponse::Result {
value: Box::<RawValue>::default().into(),
num_bytes: max_item_weight,
};
assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight);
let oversized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
let oversized_data: ForwardedResponse<Arc<RawValue>> = ForwardedResponse::Result {
value: Box::<RawValue>::default().into(),
num_bytes: max_item_weight * 2,
};
assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX);
let test_cache: Cache<u32, JsonRpcResponseEnum<Arc<RawValue>>> =
let test_cache: Cache<u32, ForwardedResponse<Arc<RawValue>>> =
CacheBuilder::new(weight_capacity)
.weigher(move |k, v| weigher.weigh(k, v))
.time_to_live(Duration::from_secs(2))

View File

@ -490,9 +490,9 @@ impl Web3Rpcs {
let parsed = response.parsed().await?;
match parsed.payload {
jsonrpc::Payload::Success { result } => Ok(result),
jsonrpc::ResponsePayload::Success { result } => Ok(result),
// TODO: confirm this error type is correct
jsonrpc::Payload::Error { error } => Err(error.into()),
jsonrpc::ResponsePayload::Error { error } => Err(error.into()),
}
}

View File

@ -1338,8 +1338,8 @@ impl Web3Rpc {
let response = handle.request().await?;
let parsed = response.parsed().await?;
match parsed.payload {
jsonrpc::Payload::Success { result } => Ok(result),
jsonrpc::Payload::Error { error } => Err(error.into()),
jsonrpc::ResponsePayload::Success { result } => Ok(result),
jsonrpc::ResponsePayload::Error { error } => Err(error.into()),
}
}
}

View File

@ -2,7 +2,7 @@ use super::one::Web3Rpc;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType, Web3Request};
use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcResultData, Payload};
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload};
use anyhow::Context;
use chrono::Utc;
use derive_more::From;
@ -339,7 +339,7 @@ impl OpenRequestHandle {
// TODO: counters for errors vs jsonrpc vs success?
let response_is_success = match &response {
Ok(jsonrpc::SingleResponse::Parsed(x)) => {
matches!(&x.payload, Payload::Success { .. })
matches!(&x.payload, ResponsePayload::Success { .. })
}
Ok(jsonrpc::SingleResponse::Stream(..)) => true,
Err(_) => false,
@ -367,8 +367,8 @@ impl OpenRequestHandle {
let response_type: ResponseType = match &response {
Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload {
Payload::Success { .. } => unreachable!(),
Payload::Error { error } => {
ResponsePayload::Success { .. } => unreachable!(),
ResponsePayload::Error { error } => {
trace!(?error, "jsonrpc error data");
if error.message.starts_with("execution reverted") {