more detailed errors for timeouts and other things

This commit is contained in:
Bryan Stitt 2023-11-14 16:22:47 -08:00
parent 9f2d4aa731
commit 3159844f5d
4 changed files with 58 additions and 41 deletions

@ -2,7 +2,7 @@ mod ws;
use crate::caches::{RegisteredUserRateLimitKey, RpcSecretKeyCache, UserBalanceCache};
use crate::config::{AppConfig, TopConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::errors::{RequestForError, Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA};
use crate::jsonrpc::{
@ -1269,7 +1269,8 @@ impl App {
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
// TODO: pass the original request into as_json_response_parts
let (a, b) = err.as_json_response_parts(error_id, None::<RequestForError>);
let rpcs = vec![];
@ -1288,7 +1289,7 @@ impl App {
// TODO: refresh the request here?
// turn some of the Web3ProxyErrors into Ok results
match self._proxy_request_with_caching(web3_request).await {
match self._proxy_request_with_caching(&web3_request).await {
Ok(response_data) => {
last_success = Some(response_data);
break;
@ -1309,30 +1310,9 @@ impl App {
}
}
// TODO: refresh the request instead of making new each time. then we need less clones
web3_request_result = ValidatedRequest::new_with_app(
self,
authorization.clone(),
None,
None,
request.clone().into(),
head_block.clone(),
)
.await;
// TODO: refresh the request?
}
let web3_request = match web3_request_result {
Ok(x) => x,
Err(err) => {
// i don't think we can get here, but just in case
let (a, b) = err.as_json_response_parts(error_id, Some(&request));
let rpcs = vec![];
return (a, b, rpcs);
}
};
let last_response = if let Some(last_success) = last_success {
Ok(last_success)
} else {

@ -221,11 +221,14 @@ pub enum Web3ProxyError {
#[derive(Default, From, Serialize)]
pub enum RequestForError<'a> {
/// sometimes we don't have a request object at all
/// TODO: attach Authorization to this, too
#[default]
None,
/// sometimes parsing the request fails. Give them the original string
/// TODO: attach Authorization to this, too
Unparsed(&'a str),
/// sometimes we have json
/// TODO: attach Authorization to this, too
SingleRequest(&'a SingleRequest),
// sometimes we have json for a batch of requests
// Batch(&'a BatchRequest),
@ -233,6 +236,16 @@ pub enum RequestForError<'a> {
Validated(&'a ValidatedRequest),
}
impl RequestForError<'_> {
pub fn started_active_premium(&self) -> bool {
match self {
Self::Validated(x) => x.started_active_premium,
// TODO: check authorization on more types
_ => false,
}
}
}
impl Web3ProxyError {
pub fn as_json_response_parts<'a, R>(
&self,
@ -759,7 +772,10 @@ impl Web3ProxyError {
// TODO: different messages of cancelled or not?
message: "Unable to complete request".into(),
code: code.as_u16().into(),
data: Some(serde_json::Value::String(err.to_string())),
data: Some(json!({
"request": request_for_error,
"err": err.to_string(),
})),
},
)
}
@ -795,7 +811,7 @@ impl Web3ProxyError {
// TODO: do this without clone? the Arc needed it though
(StatusCode::OK, jsonrpc_error_data.clone())
}
Self::MdbxPanic(rpc, msg) => {
Self::MdbxPanic(rpc_name, msg) => {
error!(%msg, "mdbx panic");
// TODO: this is bad enough that we should send something to pager duty
@ -805,7 +821,11 @@ impl Web3ProxyError {
JsonRpcErrorData {
message: "mdbx panic".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: Some(json!({"rpc": rpc})),
data: Some(json!({
"err": msg,
"request": request_for_error,
"rpc": rpc_name,
})),
},
)
}
@ -833,6 +853,7 @@ impl Web3ProxyError {
data: Some(json!({
"err": "Blocks here must have a number or hash",
"extra": "you found a bug. please contact us if you see this and we can help figure out what happened. https://discord.llamanodes.com/",
"request": request_for_error,
})),
},
)
@ -844,7 +865,9 @@ impl Web3ProxyError {
JsonRpcErrorData {
message: "no blocks known".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
data: Some(json!({
"request": request_for_error,
})),
},
)
}
@ -855,7 +878,9 @@ impl Web3ProxyError {
JsonRpcErrorData {
message: "no consensus head block".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
data: Some(json!({
"request": request_for_error,
})),
},
)
}
@ -878,7 +903,9 @@ impl Web3ProxyError {
JsonRpcErrorData {
message: "unable to retry for request handle".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
data: Some(json!({
"request": request_for_error,
})),
},
)
}
@ -900,7 +927,9 @@ impl Web3ProxyError {
JsonRpcErrorData {
message: "no servers synced".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
data: Some(json!({
"request": request_for_error,
})),
},
)
}
@ -912,13 +941,13 @@ impl Web3ProxyError {
(
StatusCode::BAD_GATEWAY,
JsonRpcErrorData {
message: format!(
"not enough rpcs connected {}/{}",
num_known, min_head_rpcs
)
.into(),
message: "not enough rpcs connected".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
data: Some(json!({
"known": num_known,
"needed": min_head_rpcs,
"request": request_for_error,
})),
},
)
}
@ -932,6 +961,7 @@ impl Web3ProxyError {
data: Some(json!({
"available": available,
"needed": needed,
"request": request_for_error,
})),
},
)
@ -956,8 +986,9 @@ impl Web3ProxyError {
message: "RPC is lagged".into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: Some(json!({
"rpc": rpc.name,
"head": old_head,
"request": request_for_error,
"rpc": rpc.name,
})),
},
)
@ -1222,7 +1253,7 @@ impl Web3ProxyError {
)
}
Self::Timeout(x) => {
let data = if request_for_error.active_premium() {
let data = if request_for_error.started_active_premium() {
json!({
"duration": x.as_ref().map(|x| x.as_secs_f32()),
"request": request_for_error,

@ -214,6 +214,10 @@ pub struct ValidatedRequest {
pub inner: RequestOrMethod,
/// if the rpc key used for this request is premium (at the start of the request)
pub started_active_premium: bool,
// TODO: everything under here should be behind a single lock. all these atomics need to be updated together!
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
#[derivative(Default(value = "Instant::now()"))]
@ -332,7 +336,7 @@ impl ValidatedRequest {
let stat_sender = app.and_then(|x| x.stat_sender.clone());
// let request: RequestOrMethod = request.into();
let started_active_premium = authorization.active_premium().await;
// we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key
// this is because calculating the cache_key may modify the params!
@ -389,6 +393,7 @@ impl ValidatedRequest {
response_millis: 0.into(),
response_timestamp: 0.into(),
start_instant,
started_active_premium,
stat_sender,
usd_per_cu,
user_error_response: false.into(),

@ -210,6 +210,7 @@ impl MigrateStatsToV2SubCommand {
response_timestamp: x.period_datetime.timestamp().into(),
response_millis: int_response_millis.into(),
stat_sender: Some(stat_sender.clone()),
started_active_premium: false,
user_error_response: false.into(),
usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(),
cache_mode: Default::default(),