maybe getting rid of this arc will fix streaming taking twice
This commit is contained in:
parent
11e50042c6
commit
042707eca2
@ -7,7 +7,7 @@ use crate::frontend::authorization::Authorization;
|
||||
use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA};
|
||||
use crate::jsonrpc::{
|
||||
self, JsonRpcErrorData, JsonRpcParams, JsonRpcRequestEnum, JsonRpcResultData, LooseId,
|
||||
SingleRequest, SingleResponse, ValidatedRequest,
|
||||
ParsedResponse, SingleRequest, SingleResponse, ValidatedRequest,
|
||||
};
|
||||
use crate::relational_db::{connect_db, migrate_db};
|
||||
use crate::response_cache::{ForwardedResponse, JsonRpcResponseCache, JsonRpcResponseWeigher};
|
||||
@ -47,7 +47,7 @@ use std::time::Duration;
|
||||
use tokio::select;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
|
||||
use tokio::task::{yield_now, JoinHandle};
|
||||
use tokio::time::{sleep, sleep_until, timeout, timeout_at, Instant};
|
||||
use tokio::time::{sleep, sleep_until, timeout_at, Instant};
|
||||
use tracing::{error, info, trace, warn};
|
||||
|
||||
// TODO: make this customizable?
|
||||
@ -1168,30 +1168,14 @@ impl App {
|
||||
let (code, response) = match last_response {
|
||||
Ok(response_data) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
|
||||
// TODO: is it true that all jsonrpc errors are user errors?
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
.store(response_data.is_jsonrpc_err(), Ordering::Relaxed);
|
||||
|
||||
(StatusCode::OK, response_data)
|
||||
}
|
||||
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
|
||||
err.as_json_response_parts(web3_request.id())
|
||||
}
|
||||
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(response_data.is_error(), Ordering::Relaxed);
|
||||
|
||||
let response =
|
||||
jsonrpc::ParsedResponse::from_response_data(response_data, web3_request.id());
|
||||
(StatusCode::OK, response.into())
|
||||
}
|
||||
Err(err) => {
|
||||
// max tries exceeded. return the error
|
||||
|
||||
@ -1628,11 +1612,11 @@ impl App {
|
||||
|
||||
let x: SingleResponse = if let Some(data) = self.jsonrpc_response_cache.get(&cache_key).await {
|
||||
// it was cached! easy!
|
||||
// TODO: wait. this currently panics. why?
|
||||
jsonrpc::ParsedResponse::from_response_data(data, web3_request.id()).into()
|
||||
} else if self.jsonrpc_response_failed_cache_keys.contains_key(&cache_key) {
|
||||
// this is a cache_key that we know won't cache
|
||||
// NOTICE! We do **NOT** use get which means the key's hotness is not updated. we don't use time-to-idler here so thats fine. but be careful if that changes
|
||||
// this is a request that we have previously failed to cache. don't try the cache again
|
||||
// TODO: is "contains_key" okay, or do we need "get($cache_key).await"?
|
||||
// TODO: DRY. we do this timeout and try_proxy_connection below, too.
|
||||
timeout_at(
|
||||
web3_request.expire_at(),
|
||||
self.balanced_rpcs
|
||||
@ -1641,109 +1625,59 @@ impl App {
|
||||
)
|
||||
).await??
|
||||
} else {
|
||||
// TODO: acquire a semaphore from a map with the cache key as the key
|
||||
// TODO: try it, if that fails, then we are already running. wait until the semaphore completes and then run on. they will all go only one at a time though
|
||||
// TODO: if we got the semaphore, do the try_get_with
|
||||
// TODO: if the response is too big to cache mark the cache_key as not cacheable. maybe CacheMode can check that cache?
|
||||
// we used to have a semaphore here, but its faster to just allow duplicate requests while the first is still in flight
|
||||
// we might do some duplicate requests here, but it seems worth it to get rid of the Arc errors.
|
||||
let response_data = timeout_at(
|
||||
web3_request.expire_at(),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
web3_request,
|
||||
)
|
||||
).await?;
|
||||
|
||||
let s = self.jsonrpc_response_semaphores.get_with(cache_key, async move {
|
||||
Arc::new(Semaphore::new(1))
|
||||
}).await;
|
||||
match response_data {
|
||||
Ok(mut x) => {
|
||||
match &x {
|
||||
SingleResponse::Parsed(x) => {
|
||||
// TODO: don't serialize here! we should already know the size!
|
||||
let len = serde_json::to_string(&x).unwrap().len();
|
||||
|
||||
// TODO: don't always do 1 second. use the median request latency instead
|
||||
let mut x = match timeout(Duration::from_secs(1), s.acquire_owned()).await {
|
||||
Err(_) => {
|
||||
// TODO: should we try to cache this? whatever has the semaphore //should// handle that for us
|
||||
timeout_at(
|
||||
web3_request.expire_at(),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
web3_request,
|
||||
)
|
||||
).await??
|
||||
}
|
||||
Ok(_p) => {
|
||||
// we got the permit! we are either first, or we were waiting a short time to get it in which case this response should be cached
|
||||
// TODO: clone less? its spawned so i don't think we can
|
||||
let f = {
|
||||
let app = self.clone();
|
||||
let web3_request = web3_request.clone();
|
||||
if len <= max_response_cache_bytes {
|
||||
let cached = ForwardedResponse::from(x.payload.clone());
|
||||
|
||||
async move {
|
||||
app
|
||||
.jsonrpc_response_cache
|
||||
.try_get_with::<_, Web3ProxyError>(cache_key, async {
|
||||
// TODO: dynamic timeout based on whats left on web3_request
|
||||
let response_data = timeout_at(web3_request.expire_at(), app.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
&web3_request,
|
||||
)).await;
|
||||
|
||||
match response_data {
|
||||
Ok(response_data) => {
|
||||
if !web3_request.cache_jsonrpc_errors() && let Err(err) = response_data {
|
||||
// if we are not supposed to cache jsonrpc errors,
|
||||
// then we must not convert Provider errors into a ForwardedResponse
|
||||
// return all the errors now. moka will not cache Err results
|
||||
Err(err)
|
||||
} else {
|
||||
// convert jsonrpc errors into ForwardedResponse, but leave the rest as errors
|
||||
let response_data: ForwardedResponse<Arc<RawValue>> = response_data.try_into()?;
|
||||
|
||||
if response_data.is_null() {
|
||||
// don't ever cache "null" as a success. its too likely to be a problem
|
||||
Err(Web3ProxyError::NullJsonRpcResult)
|
||||
} else if response_data.num_bytes() > max_response_cache_bytes {
|
||||
// don't cache really large requests
|
||||
// TODO: emit a stat
|
||||
Err(Web3ProxyError::JsonRpcResponse(response_data))
|
||||
} else {
|
||||
// TODO: response data should maybe be Arc<ForwardedResponse<Box<RawValue>>>, but that's more work
|
||||
Ok(response_data)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => Err(Web3ProxyError::from(err)),
|
||||
}
|
||||
}).await
|
||||
}
|
||||
};
|
||||
|
||||
// this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache
|
||||
// TODO: is this expect actually safe!? could there be a background process that still has the arc?
|
||||
let x = match tokio::spawn(f).await? {
|
||||
Ok(response_data) => Ok(jsonrpc::ParsedResponse::from_response_data(response_data, Default::default()).into()),
|
||||
Err(err) => {
|
||||
self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await;
|
||||
|
||||
if let Web3ProxyError::StreamResponse(x) = err.as_ref() {
|
||||
if let Some(x) = x.lock().take() {
|
||||
Ok(jsonrpc::SingleResponse::Stream(x))
|
||||
} else {
|
||||
let method = web3_request.inner.method();
|
||||
let params = web3_request.inner.params();
|
||||
let err: Web3ProxyError = anyhow::anyhow!("stream was already taken. please report this in Discord. method={:?} params={:?}", method, params).into();
|
||||
|
||||
Err(Arc::new(err))
|
||||
}
|
||||
self.jsonrpc_response_cache.insert(cache_key, cached).await;
|
||||
} else {
|
||||
Err(err)
|
||||
self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
SingleResponse::Stream(..) => {
|
||||
self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await;
|
||||
}
|
||||
}
|
||||
|
||||
let mut x = x?;
|
||||
|
||||
// clear the id. theres no point including it in our cached response
|
||||
x.set_id(Default::default());
|
||||
x.set_id(web3_request.id());
|
||||
|
||||
x
|
||||
}
|
||||
};
|
||||
Err(err) => {
|
||||
if web3_request.cache_jsonrpc_errors() {
|
||||
// we got an error, but we are supposed to cache jsonrpc errors.
|
||||
let x: Result<ForwardedResponse<Arc<RawValue>>, Web3ProxyError> = err.try_into();
|
||||
|
||||
x.set_id(web3_request.id());
|
||||
if x.is_err() {
|
||||
// we still have an Err. it must not have been a jsonrpc error
|
||||
self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await;
|
||||
}
|
||||
|
||||
x
|
||||
// TODO: needing multiple into/try_into/from must be inefficient. investigate this
|
||||
ParsedResponse::from_response_data(x?, web3_request.id()).into()
|
||||
} else {
|
||||
// we got an error, and we are not supposed to cache jsonrpc errors. exit early
|
||||
self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
x
|
||||
|
@ -123,8 +123,8 @@ impl App {
|
||||
let response_str = serde_json::to_string(&response_json)
|
||||
.expect("this should always be valid json");
|
||||
|
||||
// we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
|
||||
let response_bytes = response_str.len();
|
||||
// we could use ForwardedResponse::num_bytes() here, but since we already have the string, this is easier
|
||||
let response_bytes = response_str.len() as u64;
|
||||
|
||||
// TODO: do clients support binary messages?
|
||||
// TODO: can we check a content type header?
|
||||
@ -212,8 +212,8 @@ impl App {
|
||||
let response_str = serde_json::to_string(&response_json)
|
||||
.expect("this should always be valid json");
|
||||
|
||||
// we could use SingleForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
|
||||
let response_bytes = response_str.len();
|
||||
// we could use ForwardedResponse::num_bytes() here, but since we already have the string, this is easier
|
||||
let response_bytes = response_str.len() as u64;
|
||||
|
||||
subscription_web3_request.add_response(response_bytes);
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
//! Utlities for logging errors for admins and displaying errors to users.
|
||||
|
||||
use crate::frontend::authorization::Authorization;
|
||||
use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse};
|
||||
use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse, StreamResponse};
|
||||
use crate::response_cache::ForwardedResponse;
|
||||
use crate::rpcs::provider::EthersHttpProvider;
|
||||
use axum::extract::rejection::JsonRejection;
|
||||
@ -19,7 +19,6 @@ use http::header::InvalidHeaderValue;
|
||||
use http::uri::InvalidUri;
|
||||
use ipnet::AddrParseError;
|
||||
use migration::sea_orm::DbErr;
|
||||
use parking_lot::Mutex;
|
||||
use redis_rate_limiter::redis::RedisError;
|
||||
use redis_rate_limiter::RedisPoolError;
|
||||
use reqwest::header::ToStrError;
|
||||
@ -136,16 +135,6 @@ pub enum Web3ProxyError {
|
||||
#[from(ignore)]
|
||||
MethodNotFound(Cow<'static, str>),
|
||||
NoVolatileRedisDatabase,
|
||||
/// make it easy to skip caching large results
|
||||
#[error(ignore)]
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
JsonRpcResponse(ForwardedResponse<Arc<RawValue>>),
|
||||
/// make it easy to skip caching streaming results
|
||||
#[error(ignore)]
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
StreamResponse(Mutex<Option<jsonrpc::StreamResponse<Arc<RawValue>>>>),
|
||||
/// make it easy to skip caching null results
|
||||
NullJsonRpcResult,
|
||||
OriginRequired,
|
||||
#[error(ignore)]
|
||||
#[from(ignore)]
|
||||
@ -171,6 +160,9 @@ pub enum Web3ProxyError {
|
||||
/// simple way to return an error message to the user and an anyhow to our logs
|
||||
#[display(fmt = "{}, {}, {:?}", _0, _1, _2)]
|
||||
StatusCode(StatusCode, Cow<'static, str>, Option<serde_json::Value>),
|
||||
#[display(fmt = "streaming response")]
|
||||
#[error(ignore)]
|
||||
StreamResponse(StreamResponse<Arc<RawValue>>),
|
||||
#[cfg(feature = "stripe")]
|
||||
StripeWebhookError(stripe::WebhookError),
|
||||
/// TODO: what should be attached to the timout?
|
||||
@ -828,17 +820,6 @@ impl Web3ProxyError {
|
||||
},
|
||||
)
|
||||
}
|
||||
Self::JsonRpcResponse(response_enum) => {
|
||||
// TODO: shame we have to clone, but its an Arc so its not terrible
|
||||
return (StatusCode::OK, response_enum.clone());
|
||||
}
|
||||
Self::StreamResponse(_resp) => {
|
||||
// TODO: better way of doing this?
|
||||
unreachable!("stream is pulled out, not used here");
|
||||
}
|
||||
Self::NullJsonRpcResult => {
|
||||
return (StatusCode::OK, ForwardedResponse::NullResult);
|
||||
}
|
||||
Self::OriginRequired => {
|
||||
trace!("OriginRequired");
|
||||
(
|
||||
@ -1046,6 +1027,10 @@ impl Web3ProxyError {
|
||||
},
|
||||
)
|
||||
}
|
||||
Self::StreamResponse(..) => {
|
||||
// TODO: should it really?
|
||||
unimplemented!("streaming should be handled elsewhere");
|
||||
}
|
||||
#[cfg(feature = "stripe")]
|
||||
Self::StripeWebhookError(err) => {
|
||||
trace!(?err, "StripeWebhookError");
|
||||
|
@ -179,27 +179,21 @@ pub enum ResponseOrBytes<'a> {
|
||||
Json(&'a serde_json::Value),
|
||||
Response(&'a jsonrpc::SingleResponse),
|
||||
Error(&'a Web3ProxyError),
|
||||
Bytes(usize),
|
||||
}
|
||||
|
||||
impl<'a> From<u64> for ResponseOrBytes<'a> {
|
||||
fn from(value: u64) -> Self {
|
||||
Self::Bytes(value as usize)
|
||||
}
|
||||
Bytes(u64),
|
||||
}
|
||||
|
||||
impl ResponseOrBytes<'_> {
|
||||
pub fn num_bytes(&self) -> usize {
|
||||
pub fn num_bytes(&self) -> u64 {
|
||||
match self {
|
||||
Self::Json(x) => serde_json::to_string(x)
|
||||
.expect("this should always serialize")
|
||||
.len(),
|
||||
.len() as u64,
|
||||
Self::Response(x) => x.num_bytes(),
|
||||
Self::Bytes(num_bytes) => *num_bytes,
|
||||
Self::Error(x) => {
|
||||
let (_, x) = x.as_response_parts();
|
||||
|
||||
x.num_bytes() as usize
|
||||
x.num_bytes()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -397,8 +397,8 @@ async fn websocket_proxy_web3_rpc(
|
||||
let response =
|
||||
jsonrpc::ParsedResponse::from_value(json!(partial_response), web3_request.id());
|
||||
|
||||
// TODO: better way of passing in ParsedResponse
|
||||
let response = jsonrpc::SingleResponse::Parsed(response);
|
||||
|
||||
web3_request.add_response(&response);
|
||||
let response = response.parsed().await.expect("Response already parsed");
|
||||
|
||||
|
@ -3,7 +3,7 @@ use std::borrow::Cow;
|
||||
|
||||
// TODO: impl Error on this?
|
||||
/// All jsonrpc errors use this structure
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct JsonRpcErrorData {
|
||||
/// The error code
|
||||
pub code: i64,
|
||||
@ -15,16 +15,16 @@ pub struct JsonRpcErrorData {
|
||||
}
|
||||
|
||||
impl JsonRpcErrorData {
|
||||
pub fn num_bytes(&self) -> usize {
|
||||
pub fn num_bytes(&self) -> u64 {
|
||||
serde_json::to_string(self)
|
||||
.expect("should always serialize")
|
||||
.len()
|
||||
.len() as u64
|
||||
}
|
||||
|
||||
pub fn is_retryable(&self) -> bool {
|
||||
// TODO: move stuff from request to here
|
||||
todo!()
|
||||
}
|
||||
// pub fn is_retryable(&self) -> bool {
|
||||
// // TODO: move stuff from request to here
|
||||
// todo!()
|
||||
// }
|
||||
}
|
||||
|
||||
impl From<&'static str> for JsonRpcErrorData {
|
||||
|
@ -554,7 +554,7 @@ impl ValidatedRequest {
|
||||
// TODO: fetch? set? should it be None in a Mutex? or a OnceCell?
|
||||
let response = response.into();
|
||||
|
||||
let num_bytes = response.num_bytes() as u64;
|
||||
let num_bytes = response.num_bytes();
|
||||
|
||||
self.response_bytes
|
||||
.fetch_add(num_bytes, atomic::Ordering::Relaxed);
|
||||
|
@ -6,6 +6,7 @@ use axum::body::StreamBody;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Json;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use derivative::Derivative;
|
||||
use futures_util::stream::{self, StreamExt};
|
||||
use futures_util::TryStreamExt;
|
||||
use serde::{de, Deserialize, Serialize};
|
||||
@ -19,7 +20,7 @@ pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + f
|
||||
|
||||
/// TODO: borrow values to avoid allocs if possible
|
||||
/// TODO: lots of overlap with `SingleForwardedResponse`
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct ParsedResponse<T = Arc<RawValue>> {
|
||||
pub jsonrpc: String,
|
||||
pub id: Box<RawValue>,
|
||||
@ -193,30 +194,32 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ResponsePayload<T> {
|
||||
Success { result: T },
|
||||
Error { error: JsonRpcErrorData },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct StreamResponse<T> {
|
||||
_t: PhantomData<T>,
|
||||
buffer: Bytes,
|
||||
num_bytes: Option<u64>,
|
||||
#[derivative(Debug = "ignore")]
|
||||
response: reqwest::Response,
|
||||
web3_request: Arc<ValidatedRequest>,
|
||||
}
|
||||
|
||||
impl<T> StreamResponse<T> {
|
||||
// TODO: error handing
|
||||
pub async fn read(self) -> Web3ProxyResult<ParsedResponse<T>>
|
||||
where
|
||||
T: de::DeserializeOwned,
|
||||
{
|
||||
let mut buffer = BytesMut::with_capacity(self.buffer.len());
|
||||
buffer.extend_from_slice(&self.buffer);
|
||||
buffer.extend_from_slice(&self.response.bytes().await?);
|
||||
buffer.extend(self.buffer);
|
||||
buffer.extend(self.response.bytes().await?);
|
||||
let parsed = serde_json::from_slice(&buffer)?;
|
||||
Ok(parsed)
|
||||
}
|
||||
@ -227,7 +230,7 @@ impl<T> IntoResponse for StreamResponse<T> {
|
||||
let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) })
|
||||
.chain(self.response.bytes_stream())
|
||||
.map_ok(move |x| {
|
||||
let len = x.len();
|
||||
let len = x.len() as u64;
|
||||
|
||||
self.web3_request.add_response(len);
|
||||
|
||||
@ -241,6 +244,8 @@ impl<T> IntoResponse for StreamResponse<T> {
|
||||
#[derive(Debug)]
|
||||
pub enum SingleResponse<T = Arc<RawValue>> {
|
||||
/// TODO: save the size here so we don't have to serialize again
|
||||
/// TODO: before doing that, make sure we don't swap back and forth between parsed and stream and single and forwarded and end up serializing too many times
|
||||
/// TODO: should this be a ForwardedResponse instead of ParsedResponse
|
||||
Parsed(ParsedResponse<T>),
|
||||
Stream(StreamResponse<T>),
|
||||
}
|
||||
@ -249,6 +254,13 @@ impl<T> SingleResponse<T>
|
||||
where
|
||||
T: de::DeserializeOwned + Serialize,
|
||||
{
|
||||
pub fn is_jsonrpc_err(&self) -> bool {
|
||||
match self {
|
||||
Self::Parsed(resp, ..) => matches!(resp.payload, ResponsePayload::Error { .. }),
|
||||
Self::Stream(..) => false,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: threshold from configs
|
||||
// TODO: error handling
|
||||
// TODO: if a large stream's response's initial chunk "error" then we should buffer it
|
||||
@ -261,19 +273,21 @@ where
|
||||
// short
|
||||
Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?),
|
||||
// long
|
||||
Some(_) => Ok(Self::Stream(StreamResponse {
|
||||
Some(len) => Ok(Self::Stream(StreamResponse {
|
||||
_t: PhantomData::<T>,
|
||||
buffer: Bytes::new(),
|
||||
num_bytes: Some(len),
|
||||
response,
|
||||
web3_request: web3_request.clone(),
|
||||
})),
|
||||
// unknown length. maybe compressed. maybe streaming. maybe both
|
||||
None => {
|
||||
let mut buffer = BytesMut::new();
|
||||
// todo: this might over-allocate, but it's probably fine
|
||||
let mut buffer = BytesMut::with_capacity(nbytes as usize);
|
||||
while (buffer.len() as u64) < nbytes {
|
||||
match response.chunk().await? {
|
||||
Some(chunk) => {
|
||||
buffer.extend_from_slice(&chunk);
|
||||
buffer.extend(chunk);
|
||||
}
|
||||
None => {
|
||||
// it was short
|
||||
@ -287,6 +301,7 @@ where
|
||||
Ok(Self::Stream(StreamResponse {
|
||||
_t: PhantomData::<T>,
|
||||
buffer,
|
||||
num_bytes: None,
|
||||
response,
|
||||
web3_request: web3_request.clone(),
|
||||
}))
|
||||
@ -302,26 +317,23 @@ where
|
||||
// TODO: error handling
|
||||
pub async fn parsed(self) -> Web3ProxyResult<ParsedResponse<T>> {
|
||||
match self {
|
||||
Self::Parsed(resp) => Ok(resp),
|
||||
Self::Stream(resp) => resp.read().await,
|
||||
Self::Parsed(resp, ..) => Ok(resp),
|
||||
Self::Stream(resp, ..) => resp.read().await,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_bytes(&self) -> usize {
|
||||
pub fn num_bytes(&self) -> u64 {
|
||||
match self {
|
||||
Self::Parsed(response) => serde_json::to_string(response)
|
||||
.expect("this should always serialize")
|
||||
.len(),
|
||||
Self::Stream(response) => match response.response.content_length() {
|
||||
Some(len) => len as usize,
|
||||
None => 0,
|
||||
},
|
||||
.len() as u64,
|
||||
Self::Stream(response) => response.num_bytes.unwrap_or(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_id(&mut self, id: Box<RawValue>) {
|
||||
match self {
|
||||
SingleResponse::Parsed(x) => {
|
||||
SingleResponse::Parsed(x, ..) => {
|
||||
x.id = id;
|
||||
}
|
||||
SingleResponse::Stream(..) => {
|
||||
@ -343,8 +355,8 @@ where
|
||||
{
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
match self {
|
||||
Self::Parsed(resp) => Json(resp).into_response(),
|
||||
Self::Stream(resp) => resp.into_response(),
|
||||
Self::Parsed(resp, ..) => Json(resp).into_response(),
|
||||
Self::Stream(resp, ..) => resp.into_response(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ use crate::{
|
||||
block_number::{BlockNumAndHash, CacheMode},
|
||||
errors::{Web3ProxyError, Web3ProxyResult},
|
||||
frontend::authorization::RequestOrMethod,
|
||||
jsonrpc::{self, JsonRpcErrorData},
|
||||
jsonrpc::{self, JsonRpcErrorData, ResponsePayload},
|
||||
};
|
||||
use derive_more::From;
|
||||
use ethers::{
|
||||
@ -11,7 +11,6 @@ use ethers::{
|
||||
};
|
||||
use hashbrown::hash_map::DefaultHashBuilder;
|
||||
use moka::future::Cache;
|
||||
use parking_lot::Mutex;
|
||||
use serde_json::value::RawValue;
|
||||
use std::{
|
||||
hash::{BuildHasher, Hash, Hasher},
|
||||
@ -86,26 +85,25 @@ impl<'a> JsonRpcQueryCacheKey<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: i think if we change this to Arc<ForwardedResponse<Box<RawValue>>>, we can speed things up
|
||||
pub type JsonRpcResponseCache = Cache<u64, ForwardedResponse<Arc<RawValue>>>;
|
||||
|
||||
/// TODO: we might need one that holds RawValue and one that holds serde_json::Value
|
||||
/// TODO: think about this more. there is a lot of overlap with ParsedResponse
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ForwardedResponse<R> {
|
||||
pub enum ForwardedResponse<T> {
|
||||
NullResult,
|
||||
Result {
|
||||
value: R,
|
||||
num_bytes: u32,
|
||||
value: T,
|
||||
num_bytes: u64,
|
||||
},
|
||||
RpcError {
|
||||
error_data: JsonRpcErrorData,
|
||||
num_bytes: u32,
|
||||
num_bytes: u64,
|
||||
},
|
||||
}
|
||||
|
||||
// TODO: impl for other inner result types?
|
||||
impl<R> ForwardedResponse<R> {
|
||||
pub fn num_bytes(&self) -> u32 {
|
||||
pub fn num_bytes(&self) -> u64 {
|
||||
match self {
|
||||
Self::NullResult => 1,
|
||||
Self::Result { num_bytes, .. } => *num_bytes,
|
||||
@ -138,31 +136,52 @@ impl ForwardedResponse<Arc<RawValue>> {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ResponsePayload<Arc<RawValue>>> for ForwardedResponse<Arc<RawValue>> {
|
||||
fn from(value: ResponsePayload<Arc<RawValue>>) -> Self {
|
||||
match value {
|
||||
ResponsePayload::Success { result } => {
|
||||
let num_bytes = result.get().len() as u64;
|
||||
|
||||
ForwardedResponse::Result {
|
||||
value: result,
|
||||
num_bytes,
|
||||
}
|
||||
}
|
||||
ResponsePayload::Error { error } => {
|
||||
let num_bytes = error.num_bytes();
|
||||
|
||||
ForwardedResponse::RpcError {
|
||||
error_data: error,
|
||||
num_bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Web3ProxyResult<jsonrpc::SingleResponse>> for ForwardedResponse<Arc<RawValue>> {
|
||||
type Error = Web3ProxyError;
|
||||
fn try_from(response: Web3ProxyResult<jsonrpc::SingleResponse>) -> Result<Self, Self::Error> {
|
||||
match response {
|
||||
Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload {
|
||||
match response? {
|
||||
jsonrpc::SingleResponse::Parsed(parsed) => match parsed.payload {
|
||||
jsonrpc::ResponsePayload::Success { result } => {
|
||||
let num_bytes = result.get().len() as u32;
|
||||
let num_bytes = result.get().len() as u64;
|
||||
|
||||
Ok(ForwardedResponse::Result {
|
||||
value: result,
|
||||
num_bytes,
|
||||
})
|
||||
}
|
||||
jsonrpc::ResponsePayload::Error { error } => {
|
||||
let num_bytes = error.num_bytes() as u32;
|
||||
let num_bytes = error.num_bytes();
|
||||
|
||||
Ok(ForwardedResponse::RpcError {
|
||||
error_data: error,
|
||||
// TODO: this double serializes
|
||||
num_bytes,
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok(jsonrpc::SingleResponse::Stream(stream)) => {
|
||||
Err(Web3ProxyError::StreamResponse(Mutex::new(Some(stream))))
|
||||
}
|
||||
Err(err) => err.try_into(),
|
||||
jsonrpc::SingleResponse::Stream(stream) => Err(Web3ProxyError::StreamResponse(stream)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -177,9 +196,7 @@ impl From<serde_json::Value> for ForwardedResponse<Arc<RawValue>> {
|
||||
|
||||
impl From<Arc<RawValue>> for ForwardedResponse<Arc<RawValue>> {
|
||||
fn from(value: Arc<RawValue>) -> Self {
|
||||
let num_bytes = value.get().len();
|
||||
|
||||
let num_bytes = num_bytes as u32;
|
||||
let num_bytes = value.get().len() as u64;
|
||||
|
||||
Self::Result { value, num_bytes }
|
||||
}
|
||||
@ -187,13 +204,9 @@ impl From<Arc<RawValue>> for ForwardedResponse<Arc<RawValue>> {
|
||||
|
||||
impl From<Box<RawValue>> for ForwardedResponse<Arc<RawValue>> {
|
||||
fn from(value: Box<RawValue>) -> Self {
|
||||
let num_bytes = value.get().len();
|
||||
let value: Arc<RawValue> = value.into();
|
||||
|
||||
let num_bytes = num_bytes as u32;
|
||||
|
||||
let value = value.into();
|
||||
|
||||
Self::Result { value, num_bytes }
|
||||
value.into()
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,8 +219,6 @@ impl TryFrom<Web3ProxyError> for ForwardedResponse<Arc<RawValue>> {
|
||||
Ok(x) => Ok(x.into()),
|
||||
Err(..) => Err(err.into()),
|
||||
},
|
||||
Web3ProxyError::NullJsonRpcResult => Ok(ForwardedResponse::NullResult),
|
||||
Web3ProxyError::JsonRpcResponse(x) => Ok(x),
|
||||
Web3ProxyError::JsonRpcErrorData(err) => Ok(err.into()),
|
||||
err => Err(err),
|
||||
}
|
||||
@ -228,27 +239,11 @@ impl TryFrom<Result<Arc<RawValue>, Web3ProxyError>> for ForwardedResponse<Arc<Ra
|
||||
}
|
||||
}
|
||||
}
|
||||
impl TryFrom<Result<Box<RawValue>, Web3ProxyError>> for ForwardedResponse<Arc<RawValue>> {
|
||||
type Error = Web3ProxyError;
|
||||
|
||||
fn try_from(value: Result<Box<RawValue>, Web3ProxyError>) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
Ok(x) => Ok(x.into()),
|
||||
Err(err) => {
|
||||
let x: Self = err.try_into()?;
|
||||
|
||||
Ok(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> From<JsonRpcErrorData> for ForwardedResponse<R> {
|
||||
fn from(value: JsonRpcErrorData) -> Self {
|
||||
// TODO: wrap the error in a complete response?
|
||||
let num_bytes = serde_json::to_string(&value).unwrap().len();
|
||||
|
||||
let num_bytes = num_bytes as u32;
|
||||
let num_bytes = serde_json::to_string(&value).unwrap().len() as u64;
|
||||
|
||||
Self::RpcError {
|
||||
error_data: value,
|
||||
@ -317,14 +312,16 @@ impl<'a> TryFrom<&'a WsClientError> for JsonRpcErrorData {
|
||||
pub struct JsonRpcResponseWeigher(pub u32);
|
||||
|
||||
impl JsonRpcResponseWeigher {
|
||||
pub fn weigh<K, R>(&self, _key: &K, value: &ForwardedResponse<R>) -> u32 {
|
||||
let x = value.num_bytes();
|
||||
|
||||
if x > self.0 {
|
||||
// return max. the item may start to be inserted into the cache, but it will be immediatly removed
|
||||
u32::MAX
|
||||
pub fn weigh<K, T>(&self, _key: &K, value: &ForwardedResponse<T>) -> u32 {
|
||||
if let Ok(x) = value.num_bytes().try_into() {
|
||||
if x > self.0 {
|
||||
// return max. the item may start to be inserted into the cache, but it will be immediatly removed
|
||||
u32::MAX
|
||||
} else {
|
||||
x
|
||||
}
|
||||
} else {
|
||||
x
|
||||
u32::MAX
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -346,21 +343,21 @@ mod tests {
|
||||
|
||||
let small_data: ForwardedResponse<Arc<RawValue>> = ForwardedResponse::Result {
|
||||
value: Box::<RawValue>::default().into(),
|
||||
num_bytes: max_item_weight / 2,
|
||||
num_bytes: (max_item_weight / 2) as u64,
|
||||
};
|
||||
|
||||
assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2);
|
||||
|
||||
let max_sized_data: ForwardedResponse<Arc<RawValue>> = ForwardedResponse::Result {
|
||||
value: Box::<RawValue>::default().into(),
|
||||
num_bytes: max_item_weight,
|
||||
num_bytes: max_item_weight as u64,
|
||||
};
|
||||
|
||||
assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight);
|
||||
|
||||
let oversized_data: ForwardedResponse<Arc<RawValue>> = ForwardedResponse::Result {
|
||||
value: Box::<RawValue>::default().into(),
|
||||
num_bytes: max_item_weight * 2,
|
||||
num_bytes: (max_item_weight * 2) as u64,
|
||||
};
|
||||
|
||||
assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX);
|
||||
|
@ -340,7 +340,7 @@ impl OpenRequestHandle {
|
||||
// true if we got a jsonrpc result. a jsonrpc error or other error is false.
|
||||
// TODO: counters for errors vs jsonrpc vs success?
|
||||
let response_is_success = match &response {
|
||||
Ok(jsonrpc::SingleResponse::Parsed(x)) => {
|
||||
Ok(jsonrpc::SingleResponse::Parsed(x, ..)) => {
|
||||
matches!(&x.payload, ResponsePayload::Success { .. })
|
||||
}
|
||||
Ok(jsonrpc::SingleResponse::Stream(..)) => true,
|
||||
@ -368,7 +368,7 @@ impl OpenRequestHandle {
|
||||
}
|
||||
|
||||
let response_type: ResponseType = match &response {
|
||||
Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload {
|
||||
Ok(jsonrpc::SingleResponse::Parsed(x, ..)) => match &x.payload {
|
||||
ResponsePayload::Success { .. } => unreachable!(),
|
||||
ResponsePayload::Error { error } => {
|
||||
trace!(?error, "jsonrpc error data");
|
||||
|
Loading…
Reference in New Issue
Block a user