diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5c639226..2d007ff3 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -2,7 +2,7 @@ mod ws; use crate::caches::{RegisteredUserRateLimitKey, RpcSecretKeyCache, UserBalanceCache}; use crate::config::{AppConfig, TopConfig}; -use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::errors::{RequestForError, Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::Authorization; use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA}; use crate::jsonrpc::{ @@ -1269,7 +1269,8 @@ impl App { { Ok(x) => x, Err(err) => { - let (a, b) = err.as_json_response_parts(error_id); + // TODO: pass the original request into as_json_response_parts + let (a, b) = err.as_json_response_parts(error_id, None::); let rpcs = vec![]; @@ -1288,7 +1289,7 @@ impl App { // TODO: refresh the request here? // turn some of the Web3ProxyErrors into Ok results - match self._proxy_request_with_caching(web3_request).await { + match self._proxy_request_with_caching(&web3_request).await { Ok(response_data) => { last_success = Some(response_data); break; @@ -1309,30 +1310,9 @@ impl App { } } - // TODO: refresh the request instead of making new each time. then we need less clones - web3_request_result = ValidatedRequest::new_with_app( - self, - authorization.clone(), - None, - None, - request.clone().into(), - head_block.clone(), - ) - .await; + // TODO: refresh the request? } - let web3_request = match web3_request_result { - Ok(x) => x, - Err(err) => { - // i don't think we can get here, but just in case - let (a, b) = err.as_json_response_parts(error_id, Some(&request)); - - let rpcs = vec![]; - - return (a, b, rpcs); - } - }; - let last_response = if let Some(last_success) = last_success { Ok(last_success) } else { diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 80078825..c114a92b 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -221,11 +221,14 @@ pub enum Web3ProxyError { #[derive(Default, From, Serialize)] pub enum RequestForError<'a> { /// sometimes we don't have a request object at all + /// TODO: attach Authorization to this, too #[default] None, /// sometimes parsing the request fails. Give them the original string + /// TODO: attach Authorization to this, too Unparsed(&'a str), /// sometimes we have json + /// TODO: attach Authorization to this, too SingleRequest(&'a SingleRequest), // sometimes we have json for a batch of requests // Batch(&'a BatchRequest), @@ -233,6 +236,16 @@ pub enum RequestForError<'a> { Validated(&'a ValidatedRequest), } +impl RequestForError<'_> { + pub fn started_active_premium(&self) -> bool { + match self { + Self::Validated(x) => x.started_active_premium, + // TODO: check authorization on more types + _ => false, + } + } +} + impl Web3ProxyError { pub fn as_json_response_parts<'a, R>( &self, @@ -759,7 +772,10 @@ impl Web3ProxyError { // TODO: different messages of cancelled or not? message: "Unable to complete request".into(), code: code.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), + data: Some(json!({ + "request": request_for_error, + "err": err.to_string(), + })), }, ) } @@ -795,7 +811,7 @@ impl Web3ProxyError { // TODO: do this without clone? the Arc needed it though (StatusCode::OK, jsonrpc_error_data.clone()) } - Self::MdbxPanic(rpc, msg) => { + Self::MdbxPanic(rpc_name, msg) => { error!(%msg, "mdbx panic"); // TODO: this is bad enough that we should send something to pager duty @@ -805,7 +821,11 @@ impl Web3ProxyError { JsonRpcErrorData { message: "mdbx panic".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(json!({"rpc": rpc})), + data: Some(json!({ + "err": msg, + "request": request_for_error, + "rpc": rpc_name, + })), }, ) } @@ -833,6 +853,7 @@ impl Web3ProxyError { data: Some(json!({ "err": "Blocks here must have a number or hash", "extra": "you found a bug. please contact us if you see this and we can help figure out what happened. https://discord.llamanodes.com/", + "request": request_for_error, })), }, ) @@ -844,7 +865,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "no blocks known".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -855,7 +878,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "no consensus head block".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -878,7 +903,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "unable to retry for request handle".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -900,7 +927,9 @@ impl Web3ProxyError { JsonRpcErrorData { message: "no servers synced".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), - data: None, + data: Some(json!({ + "request": request_for_error, + })), }, ) } @@ -912,13 +941,13 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: format!( - "not enough rpcs connected {}/{}", - num_known, min_head_rpcs - ) - .into(), + message: "not enough rpcs connected".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), - data: None, + data: Some(json!({ + "known": num_known, + "needed": min_head_rpcs, + "request": request_for_error, + })), }, ) } @@ -932,6 +961,7 @@ impl Web3ProxyError { data: Some(json!({ "available": available, "needed": needed, + "request": request_for_error, })), }, ) @@ -956,8 +986,9 @@ impl Web3ProxyError { message: "RPC is lagged".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: Some(json!({ - "rpc": rpc.name, "head": old_head, + "request": request_for_error, + "rpc": rpc.name, })), }, ) @@ -1222,7 +1253,7 @@ impl Web3ProxyError { ) } Self::Timeout(x) => { - let data = if request_for_error.active_premium() { + let data = if request_for_error.started_active_premium() { json!({ "duration": x.as_ref().map(|x| x.as_secs_f32()), "request": request_for_error, diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index a371a2fd..9c2573b6 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -214,6 +214,10 @@ pub struct ValidatedRequest { pub inner: RequestOrMethod, + /// if the rpc key used for this request is premium (at the start of the request) + pub started_active_premium: bool, + + // TODO: everything under here should be behind a single lock. all these atomics need to be updated together! /// Instant that the request was received (or at least close to it) /// We use Instant and not timestamps to avoid problems with leap seconds and similar issues #[derivative(Default(value = "Instant::now()"))] @@ -332,7 +336,7 @@ impl ValidatedRequest { let stat_sender = app.and_then(|x| x.stat_sender.clone()); - // let request: RequestOrMethod = request.into(); + let started_active_premium = authorization.active_premium().await; // we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key // this is because calculating the cache_key may modify the params! @@ -389,6 +393,7 @@ impl ValidatedRequest { response_millis: 0.into(), response_timestamp: 0.into(), start_instant, + started_active_premium, stat_sender, usd_per_cu, user_error_response: false.into(), diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index afea6891..db971070 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -210,6 +210,7 @@ impl MigrateStatsToV2SubCommand { response_timestamp: x.period_datetime.timestamp().into(), response_millis: int_response_millis.into(), stat_sender: Some(stat_sender.clone()), + started_active_premium: false, user_error_response: false.into(), usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(), cache_mode: Default::default(),