actually return a json result from influx

This commit is contained in:
Bryan Stitt 2023-02-21 22:04:47 -08:00 committed by yenicelik
parent 163bbbafca
commit c7dcc4aac3
3 changed files with 27 additions and 11 deletions

@ -11,7 +11,7 @@ use axum::{
use derive_more::From; use derive_more::From;
use http::header::InvalidHeaderValue; use http::header::InvalidHeaderValue;
use ipnet::AddrParseError; use ipnet::AddrParseError;
use log::{debug, trace, warn}; use log::{debug, error, trace, warn};
use migration::sea_orm::DbErr; use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError; use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError; use reqwest::header::ToStrError;
@ -30,6 +30,7 @@ pub enum FrontendErrorResponse {
Database(DbErr), Database(DbErr),
Headers(headers::Error), Headers(headers::Error),
HeaderToString(ToStrError), HeaderToString(ToStrError),
InfluxDb2RequestError(influxdb2::RequestError),
InvalidHeaderValue(InvalidHeaderValue), InvalidHeaderValue(InvalidHeaderValue),
IpAddrParse(AddrParseError), IpAddrParse(AddrParseError),
JoinError(JoinError), JoinError(JoinError),
@ -85,7 +86,7 @@ impl FrontendErrorResponse {
) )
} }
Self::Database(err) => { Self::Database(err) => {
warn!("database err={:?}", err); error!("database err={:?}", err);
( (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str( JsonRpcForwardedResponse::from_str(
@ -106,6 +107,18 @@ impl FrontendErrorResponse {
), ),
) )
} }
Self::InfluxDb2RequestError(err) => {
// TODO: attach a request id to the message and to this error so that if people report problems, we can dig in sentry to find out more
error!("influxdb2 err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"influxdb2 error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::IpAddrParse(err) => { Self::IpAddrParse(err) => {
warn!("IpAddrParse err={:?}", err); warn!("IpAddrParse err={:?}", err);
( (

@ -5,7 +5,7 @@ use crate::app::Web3ProxyApp;
use crate::http_params::{ use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
}; };
use crate::stats::db_queries::query_user_stats; use crate::stats::influxdb_queries::query_user_stats;
use crate::stats::StatType; use crate::stats::StatType;
use crate::user_token::UserBearerToken; use crate::user_token::UserBearerToken;
use crate::{PostLogin, PostLoginQuery}; use crate::{PostLogin, PostLoginQuery};

@ -3,24 +3,26 @@ use crate::{
app::Web3ProxyApp, app::Web3ProxyApp,
frontend::errors::FrontendErrorResponse, frontend::errors::FrontendErrorResponse,
http_params::{ http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params,
get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params, get_query_window_seconds_from_params, get_user_id_from_params,
}, },
}; };
use anyhow::Context; use anyhow::Context;
use axum::{ use axum::{
headers::{authorization::Bearer, Authorization}, headers::{authorization::Bearer, Authorization},
response::Response, response::{IntoResponse, Response},
TypedHeader, Json, TypedHeader,
}; };
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use fstrings::{f, format_args_f}; use fstrings::{f, format_args_f};
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb2::models::Query; use influxdb2::models::Query;
use influxdb2::FromDataPoint; use influxdb2::FromDataPoint;
use serde::Serialize;
use serde_json::json; use serde_json::json;
#[derive(Debug, Default, FromDataPoint)] // TODO: include chain_id, method, and some other things in this struct
#[derive(Debug, Default, FromDataPoint, Serialize)]
pub struct AggregatedRpcAccounting { pub struct AggregatedRpcAccounting {
field: String, field: String,
value: f64, value: f64,
@ -57,7 +59,6 @@ pub async fn query_user_stats<'a>(
let query_start = get_query_start_from_params(params)?.timestamp(); let query_start = get_query_start_from_params(params)?.timestamp();
let query_stop = get_query_stop_from_params(params)?.timestamp(); let query_stop = get_query_stop_from_params(params)?.timestamp();
let chain_id = get_chain_id_from_params(app, params)?; let chain_id = get_chain_id_from_params(app, params)?;
let page = get_page_from_params(params)?;
let measurement = if user_id == 0 { let measurement = if user_id == 0 {
"global_proxy" "global_proxy"
@ -99,9 +100,11 @@ pub async fn query_user_stats<'a>(
|> yield(name: "mean") |> yield(name: "mean")
"#); "#);
let query = Query::new(qs.to_string()); let query = Query::new(query.to_string());
// TODO: do not unwrap. add this error to FrontErrorResponse
// TODO: StatType::Aggregated and StatType::Detailed might need different types
let res: Vec<AggregatedRpcAccounting> = influxdb_client.query(Some(query)).await?; let res: Vec<AggregatedRpcAccounting> = influxdb_client.query(Some(query)).await?;
todo!(); Ok(Json(json!(res)).into_response())
} }