From bbeff03452a3e49398f8a26177362bc0e2270e9d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 6 Nov 2023 22:52:42 -0800 Subject: [PATCH] wip --- web3_proxy/src/app/mod.rs | 27 ++- web3_proxy/src/errors.rs | 201 +++++++++++++++++----- web3_proxy/src/frontend/authorization.rs | 6 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 18 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 8 +- web3_proxy/src/jsonrpc/request.rs | 4 +- 6 files changed, 204 insertions(+), 60 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 159f138d..5c639226 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1288,7 +1288,7 @@ impl App { // TODO: refresh the request here? // turn some of the Web3ProxyErrors into Ok results - match self._proxy_request_with_caching(&web3_request).await { + match self._proxy_request_with_caching(web3_request).await { Ok(response_data) => { last_success = Some(response_data); break; @@ -1308,8 +1308,31 @@ impl App { break; } } + + // TODO: refresh the request instead of making new each time. then we need less clones + web3_request_result = ValidatedRequest::new_with_app( + self, + authorization.clone(), + None, + None, + request.clone().into(), + head_block.clone(), + ) + .await; } + let web3_request = match web3_request_result { + Ok(x) => x, + Err(err) => { + // i don't think we can get here, but just in case + let (a, b) = err.as_json_response_parts(error_id, Some(&request)); + + let rpcs = vec![]; + + return (a, b, rpcs); + } + }; + let last_response = if let Some(last_success) = last_success { Ok(last_success) } else { @@ -1335,7 +1358,7 @@ impl App { .user_error_response .store(false, Ordering::SeqCst); - err.as_json_response_parts(web3_request.id()) + err.as_json_response_parts(web3_request.id(), Some(web3_request.as_ref())) } }; diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 51ea5865..3b8f64e1 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -2,7 +2,9 @@ use crate::block_number::BlockNumOrHash; use crate::frontend::authorization::Authorization; -use crate::jsonrpc::{self, JsonRpcErrorData, ParsedResponse, StreamResponse}; +use crate::jsonrpc::{ + self, JsonRpcErrorData, ParsedResponse, SingleRequest, StreamResponse, ValidatedRequest, +}; use crate::response_cache::ForwardedResponse; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::one::Web3Rpc; @@ -26,6 +28,7 @@ use redis_rate_limiter::redis::RedisError; use redis_rate_limiter::RedisPoolError; use reqwest::header::ToStrError; use rust_decimal::Error as DecimalError; +use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; use siwe::VerificationError; @@ -215,12 +218,31 @@ pub enum Web3ProxyError { WithContext(Option>, Cow<'static, str>), } +#[derive(Default, From, Serialize)] +pub enum RequestForError<'a> { + /// sometimes we don't have a request object at all + #[default] + None, + /// sometimes parsing the request fails. Give them the original string + Unparsed(&'a str), + /// sometimes we have json + SingleRequest(&'a SingleRequest), + // sometimes we have json for a batch of requests + // Batch(&'a BatchRequest), + /// assuming things went well, we have a validated request + Validated(&'a ValidatedRequest), +} + impl Web3ProxyError { - pub fn as_json_response_parts( + pub fn as_json_response_parts<'a, R>( &self, id: Box, - ) -> (StatusCode, jsonrpc::SingleResponse) { - let (code, response_data) = self.as_response_parts(); + request_for_error: Option, + ) -> (StatusCode, jsonrpc::SingleResponse) + where + R: Into>, + { + let (code, response_data) = self.as_response_parts(request_for_error); let response = jsonrpc::ParsedResponse::from_response_data(response_data, id); (code, response.into()) } @@ -228,7 +250,16 @@ impl Web3ProxyError { /// turn the error into an axum response. /// /// TODO? change to `to_response_parts(self)` - pub fn as_response_parts(&self) -> (StatusCode, ForwardedResponse>) { + pub fn as_response_parts<'a, R>( + &self, + request_for_error: Option, + ) -> (StatusCode, ForwardedResponse>) + where + R: Into>, + { + let request_for_error: RequestForError<'_> = + request_for_error.map(Into::into).unwrap_or_default(); + // TODO: include a unique request id in the data let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::Abi(err) => { @@ -238,7 +269,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "abi error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -250,7 +284,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: format!("FORBIDDEN: {}", msg).into(), code: StatusCode::FORBIDDEN.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -263,6 +299,7 @@ impl Web3ProxyError { message: "Archive data required".into(), code: StatusCode::OK.as_u16().into(), data: Some(json!({ + "request": request_for_error, "min": min, "max": max, })), @@ -277,12 +314,15 @@ impl Web3ProxyError { // TODO: is it safe to expose all of our anyhow strings? message: "INTERNAL SERVER ERROR".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } Self::Arc(err) => { - return err.as_response_parts(); + return err.as_response_parts(Some(request_for_error)); } Self::BadRequest(err) => { trace!(?err, "BAD_REQUEST"); @@ -291,7 +331,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "bad request".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -303,7 +346,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "bad response".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -314,7 +360,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "bad routing".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -325,7 +373,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "contract error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -336,7 +387,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "database error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -347,7 +401,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "database (arc) error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -358,7 +415,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "decimal error".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -374,7 +434,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "ethers http client error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -391,7 +454,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "ethers provider error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -408,7 +474,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "ethers ws client error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -423,6 +492,7 @@ impl Web3ProxyError { data: Some(json!({ "head": head, "requested": requested, + "request": request_for_error, })), }, ) @@ -435,7 +505,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "gas estimate result is not an U256".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -446,7 +518,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "hdr record error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -457,7 +532,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "headers error".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -468,7 +546,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "header to string error".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -479,7 +560,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -491,7 +575,10 @@ impl Web3ProxyError { // TODO: is it safe to expose these error strings? message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -503,7 +590,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "influxdb2 error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -517,6 +607,7 @@ impl Web3ProxyError { data: Some(json!({ "min": min, "max": max, + "request": request_for_error, })), }, ) @@ -528,7 +619,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -539,7 +632,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "IP is not allowed!".into(), code: StatusCode::FORBIDDEN.as_u16().into(), - data: Some(serde_json::Value::String(ip.to_string())), + data: Some(json!({ + "ip": ip + })), }, ) } @@ -550,7 +645,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: "invalid header value".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -584,7 +682,10 @@ impl Web3ProxyError { message: "std io".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), // TODO: is it safe to expose our io error strings? - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -683,7 +784,10 @@ impl Web3ProxyError { JsonRpcErrorData { message: message.into(), code: code.into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -724,9 +828,12 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: "Blocks here must have a number or hash".into(), + message: "Internal server error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: None, + data: Some(json!({ + "err": "Blocks here must have a number or hash", + "extra": "you found a bug. please contact us if you see this and we can help figure out what happened. https://discord.llamanodes.com/", + })), }, ) } @@ -1261,7 +1368,7 @@ impl Web3ProxyError { Self::WithContext(err, msg) => match err { Some(err) => { warn!(?err, %msg, "error w/ context"); - return err.as_response_parts(); + return err.as_response_parts(Some(request_for_error)); } None => { warn!(%msg, "error w/ context"); @@ -1280,9 +1387,15 @@ impl Web3ProxyError { (code, ForwardedResponse::from(err)) } - #[inline] - pub fn into_response_with_id(self, id: Option>) -> Response { - let (status_code, response_data) = self.as_response_parts(); + pub fn into_response_with_id<'a, R>( + self, + id: Option>, + request_for_error: Option, + ) -> Response + where + R: Into>, + { + let (status_code, response_data) = self.as_response_parts(request_for_error); let id = id.unwrap_or_default(); @@ -1292,7 +1405,7 @@ impl Web3ProxyError { } /// some things should keep going even if the db is down - pub fn split_db_errors(&self) -> Result<&Self, &Self> { + pub fn ok_db_errors(&self) -> Result<&Self, &Self> { match self { Web3ProxyError::NoDatabaseConfigured => Ok(self), Web3ProxyError::Database(err) => { @@ -1305,7 +1418,7 @@ impl Web3ProxyError { } Web3ProxyError::Arc(x) => { // errors from inside moka cache helpers are wrapped in an Arc - x.split_db_errors() + x.ok_db_errors() } _ => Err(self), } @@ -1326,8 +1439,9 @@ impl From for Web3ProxyError { impl IntoResponse for Web3ProxyError { #[inline] + /// TODO: maybe we don't want this anymore. maybe we want to require a web3_request? fn into_response(self) -> Response { - self.into_response_with_id(Default::default()) + self.into_response_with_id(Default::default(), RequestForError::None) } } @@ -1351,8 +1465,11 @@ where } impl Web3ProxyError { - pub fn into_message(self, id: Option>) -> Message { - let (_, err) = self.as_response_parts(); + pub fn into_message<'a, R>(self, id: Option>, web3_request: R) -> Message + where + R: Into>, + { + let (_, err) = self.as_response_parts(web3_request); let id = id.unwrap_or_default(); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 5d810e71..9103e7d1 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -4,7 +4,7 @@ use super::rpc_proxy_ws::ProxyMode; use crate::app::{App, APP_USER_AGENT}; use crate::balance::Balance; use crate::caches::RegisteredUserRateLimitKey; -use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::errors::{RequestForError, Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::globals::global_db_replica_conn; use crate::jsonrpc::{self, SingleRequest}; use crate::secrets::RpcSecretKey; @@ -192,7 +192,7 @@ impl ResponseOrBytes<'_> { Self::Response(x) => x.num_bytes(), Self::Bytes(num_bytes) => *num_bytes, Self::Error(x) => { - let (_, x) = x.as_response_parts(); + let (_, x) = x.as_response_parts(RequestForError::None); x.num_bytes() } @@ -894,7 +894,7 @@ impl App { let authorization_checks = match self.authorization_checks(proxy_mode, rpc_key).await { Ok(x) => x, Err(err) => { - if let Ok(_err) = err.split_db_errors() { + if let Ok(_err) = err.ok_db_errors() { // // TODO: this is too verbose during an outage. the warnings on the config reloader should be fine // warn!( // ?err, diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 0b5c8169..24d36ba0 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -81,6 +81,7 @@ pub async fn versus_proxy_web3_rpc( .await } +/// TODO: refactor this to use the builder pattern async fn _proxy_web3_rpc( app: Arc, ip: &IpAddr, @@ -91,14 +92,14 @@ async fn _proxy_web3_rpc( ) -> Result { // TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought) let payload = payload - .map_err(|e| Web3ProxyError::from(e).into_response_with_id(None))? + .map_err(|e| Web3ProxyError::from(e).into_response_with_id(None, None))? .0; let first_id = payload.first_id(); let authorization = ip_is_authorized(&app, ip, origin, proxy_mode) .await - .map_err(|e| e.into_response_with_id(first_id.clone()))?; + .map_err(|e| e.into_response_with_id(first_id.clone(), None))?; let authorization = Arc::new(authorization); @@ -109,10 +110,11 @@ async fn _proxy_web3_rpc( // TODO: calculate payload bytes here (before turning into serde_json::Value). that will save serializing later // TODO: is first_id the right thing to attach to this error? + // TODO: i think we want to attach the web3_request here. but that means we need to create it here let (status_code, response, rpcs) = app .proxy_web3_rpc(authorization, payload, Some(request_id)) .await - .map_err(|e| e.into_response_with_id(first_id))?; + .map_err(|e| e.into_response_with_id(first_id, None))?; let mut response = (status_code, response).into_response(); @@ -282,6 +284,7 @@ pub async fn versus_proxy_web3_rpc_with_key( .await } +/// TODO: refactor this to use the builder pattern #[allow(clippy::too_many_arguments)] async fn _proxy_web3_rpc_with_key( app: Arc, @@ -297,19 +300,19 @@ async fn _proxy_web3_rpc_with_key( // TODO: DRY w/ proxy_web3_rpc // TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought) let payload = payload - .map_err(|e| Web3ProxyError::from(e).into_response_with_id(None))? + .map_err(|e| Web3ProxyError::from(e).into_response_with_id(None, None))? .0; let first_id = payload.first_id(); let rpc_key = rpc_key .parse() - .map_err(|e: Web3ProxyError| e.into_response_with_id(first_id.clone()))?; + .map_err(|e: Web3ProxyError| e.into_response_with_id(first_id.clone(), None))?; let authorization = key_is_authorized(&app, &rpc_key, ip, origin, proxy_mode, referer, user_agent) .await - .map_err(|e| e.into_response_with_id(first_id.clone()))?; + .map_err(|e| e.into_response_with_id(first_id.clone(), None))?; let authorization = Arc::new(authorization); @@ -319,10 +322,11 @@ async fn _proxy_web3_rpc_with_key( let rpc_secret_key_id = authorization.checks.rpc_secret_key_id; + // TODO: pass web3_request to the map_err let (status_code, response, rpcs) = app .proxy_web3_rpc(authorization, payload, Some(request_id)) .await - .map_err(|e| e.into_response_with_id(first_id))?; + .map_err(|e| e.into_response_with_id(first_id, None))?; let mut response = (status_code, response).into_response(); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 6a199627..297ba66a 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -3,7 +3,7 @@ //! WebSockets are the preferred method of receiving requests, but not all clients have good support. use super::authorization::{ip_is_authorized, key_is_authorized, Authorization}; -use crate::errors::{Web3ProxyError, Web3ProxyResponse}; +use crate::errors::{RequestForError, Web3ProxyError, Web3ProxyResponse}; use crate::jsonrpc::{self, ParsedResponse, ValidatedRequest}; use crate::{app::App, errors::Web3ProxyResult, jsonrpc::SingleRequest}; use axum::headers::{Origin, Referer, UserAgent}; @@ -448,7 +448,7 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => x.to_json_string().await?, Err(err) => { - let (_, response_data) = err.as_response_parts(); + let (_, response_data) = err.as_response_parts(RequestForError::None); let response = ParsedResponse::from_response_data(response_data, response_id); @@ -498,7 +498,7 @@ async fn read_web3_socket( Ok((m, s)) => (m, Some(s)), Err(err) => { // TODO: how can we get the id out of the payload? - let m = err.into_message(None); + let m = err.into_message(None, None); (m, None) } } @@ -532,7 +532,7 @@ async fn read_web3_socket( Ok((m, s)) => (m, Some(s)), Err(err) => { // TODO: how can we get the id out of the payload? - let m = err.into_message(None); + let m = err.into_message(None, None); (m, None) } }; diff --git a/web3_proxy/src/jsonrpc/request.rs b/web3_proxy/src/jsonrpc/request.rs index 69a668d4..4b279193 100644 --- a/web3_proxy/src/jsonrpc/request.rs +++ b/web3_proxy/src/jsonrpc/request.rs @@ -1,6 +1,6 @@ use super::LooseId; use crate::app::App; -use crate::errors::Web3ProxyError; +use crate::errors::{Web3ProxyError, RequestForError}; use crate::frontend::authorization::{Authorization, RequestOrMethod}; use crate::jsonrpc::ValidatedRequest; use axum::response::Response as AxumResponse; @@ -142,7 +142,7 @@ impl JsonRpcRequestEnum { request.add_response(&response); - let response = response.into_response_with_id(Some(err_id)); + let response = response.into_response_with_id(Some(err_id), RequestForError::None); // TODO: variable duration depending on the IP sleep(duration).await;