diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 754857cf..ddbccdfc 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -224,8 +224,7 @@ pub fn get_query_window_seconds_from_params( |query_window_seconds: &String| { // parse the given timestamp query_window_seconds.parse::().map_err(|err| { - trace!("Unable to parse rpc_key_id: {:#?}", err); - Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()) + Web3ProxyError::BadRequest("Unable to parse query_window_seconds".to_string()) }) }, ) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index c9f73643..209f2d1c 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -26,7 +26,6 @@ use serde_json::{json, Number, Value}; // This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here // https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98 -// TODO: Run rustformat on it to see what the compiled produces for this #[derive(Debug, Default, FromDataPoint, Serialize)] pub struct AggregatedRpcAccounting { chain_id: String, @@ -48,12 +47,6 @@ pub struct DetailedRpcAccounting { method: String, } -// pub struct AggregatedRpcAccountingErrors { -// field: String, -// time: DateTime, -// archive_needed: f64 -// } - pub async fn query_user_stats<'a>( app: &'a Web3ProxyApp, bearer: Option>>, @@ -91,7 +84,7 @@ pub async fn query_user_stats<'a>( // Return a bad request if query_start == query_stop, because then the query is empty basically if query_start == query_stop { return Err(Web3ProxyError::BadRequest( - "Start and Stop date cannot be equal. Please specify a (different) start date." + "query_start and query_stop date cannot be equal. Please specify a different range" .to_owned(), )); } @@ -102,25 +95,13 @@ pub async fn query_user_stats<'a>( "opt_in_proxy" }; - // from(bucket: "dev_web3_proxy") - // |> range(start: v.timeRangeStart, stop: v.timeRangeStop) - // |> filter(fn: (r) => r["_measurement"] == "opt_in_proxy" or r["_measurement"] == "global_proxy") - // |> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "sum_request_bytes") - // |> group(columns: ["_field", "_measurement"]) - // |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) - // |> yield(name: "mean") - - // TODO: Should be taken from the config, not hardcoded ... - // TODO: Turn into a 500 error if bucket is not found .. - // Or just unwrap or so let bucket = &app .config .influxdb_bucket .clone() - .context("No influxdb bucket was provided")?; // "web3_proxy"; + .context("No influxdb bucket was provided")?; trace!("Bucket is {:?}", bucket); - // , "archive_needed", "error_response" let mut group_columns = vec![ "chain_id", "_measurement", @@ -158,33 +139,12 @@ pub async fn query_user_stats<'a>( StatType::Detailed => "".to_string(), }; - trace!( - "Query start and stop are: {:?} {:?}", - query_start, - query_stop - ); - trace!("Query column parameters are: {:?}", stats_column); - trace!("Query measurement is: {:?}", measurement); - trace!("Filters are: {:?} {:?}", filter_field, filter_chain_id); - trace!("Group is: {:?}", group); - trace!("window seconds are: {:?}", query_window_seconds); - - // These are taken care of probably ... - // reg. fields, collect: backend_requests, frontend_requests, cache_hits, cache_misses, total_request_bytes, total_response_bytes, total_response_millis - // "total_frontend_requests": "6", - // "total_response_bytes": "235", - // "total_response_millis": "0" - // "total_cache_hits": "6", - // "total_cache_misses": "0", - - // Perhaps gotta run a second query to get all error responses - // "total_error_responses": "0", - // Same with archive requests - // "archive_request": 0, - - // Group by method if detailed, else just keep all methods as "null". i think influxdb takes care of that - // "method": null, - // "total_backend_retries": "0", + trace!("query time range: {:?} - {:?}", query_start, query_stop); + trace!("stats_column: {:?}", stats_column); + trace!("measurement: {:?}", measurement); + trace!("filters: {:?} {:?}", filter_field, filter_chain_id); + trace!("group: {:?}", group); + trace!("query_window_seconds: {:?}", query_window_seconds); let query = f!(r#" from(bucket: "{bucket}") @@ -201,11 +161,6 @@ pub async fn query_user_stats<'a>( let query = Query::new(query.to_string()); trace!("Query to db is: {:?}", query); - // TODO: do not unwrap. add this error to FrontErrorResponse - // TODO: StatType::Aggregated and StatType::Detailed might need different types - // let unparsed: serde_json::Value = serde_json::Value::Array(influxdb_client.query(Some(query.clone())).await?); - // trace!("Direct response is: {:?}", unparsed); - // Return a different result based on the query let datapoints = match stat_response_type { StatType::Aggregated => { @@ -217,21 +172,6 @@ pub async fn query_user_stats<'a>( trace!("Resp is: {:?}", res); } - // let tmp = influx_responses.into_iter().group_by(|x| {x.time.timestamp()}).into_iter().collect::>(); - // trace!("Printing grouped item {}", tmp); - - // Group by all fields together .. - // let influx_responses = Vec::new(); - // let grouped_items = Vec::new(); - - // let mut grouped_items = influx_responses - // .into_iter() - // .map(|x| { - // (x.time.clone(), x) - // }) - // .into_group_map(); - // trace!("Grouped items are {:?}", grouped_items); - influx_responses .into_iter() .map(|x| (x._time, x)) @@ -384,7 +324,6 @@ pub async fn query_user_stats<'a>( if !out.contains_key("query_window_timestamp") { out.insert( "query_window_timestamp".to_owned(), - // serde_json::Value::Number(x.time.timestamp().into()) json!(x._time.timestamp()), ); } @@ -442,8 +381,6 @@ pub async fn query_user_stats<'a>( if user_id == 0 { // 0 means everyone. don't filter on user } else { - // q = q.left_join(rpc_key::Entity); - // condition = condition.add(rpc_key::Column::UserId.eq(user_id)); response_body.insert("user_id", serde_json::Value::Number(user_id.into())); }