will continue with migration

This commit is contained in:
yenicelik 2023-03-20 12:22:30 +01:00
parent c925ec9c27
commit 0a6ccf28b5
2 changed files with 144 additions and 113 deletions

90
Cargo.lock generated
View File

@ -142,12 +142,6 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64cb94155d965e3d37ffbbe7cc5b82c3dd79dd33bd48e536f73d2cfb8d85506f" checksum = "64cb94155d965e3d37ffbbe7cc5b82c3dd79dd33bd48e536f73d2cfb8d85506f"
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.2" version = "0.7.2"
@ -504,28 +498,16 @@ dependencies = [
"radium 0.3.0", "radium 0.3.0",
] ]
[[package]]
name = "bitvec"
version = "0.19.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33"
dependencies = [
"funty 1.1.0",
"radium 0.5.3",
"tap",
"wyz 0.2.0",
]
[[package]] [[package]]
name = "bitvec" name = "bitvec"
version = "1.0.1" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
dependencies = [ dependencies = [
"funty 2.0.0", "funty",
"radium 0.7.0", "radium 0.7.0",
"tap", "tap",
"wyz 0.5.1", "wyz",
] ]
[[package]] [[package]]
@ -1683,7 +1665,7 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade3e9c97727343984e1ceada4fdab11142d2ee3472d2c67027d56b1251d4f15" checksum = "ade3e9c97727343984e1ceada4fdab11142d2ee3472d2c67027d56b1251d4f15"
dependencies = [ dependencies = [
"arrayvec 0.7.2", "arrayvec",
"bytes", "bytes",
"cargo_metadata 0.15.3", "cargo_metadata 0.15.3",
"chrono", "chrono",
@ -2013,12 +1995,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]] [[package]]
name = "funty" name = "funty"
version = "2.0.0" version = "2.0.0"
@ -2326,7 +2302,7 @@ dependencies = [
"byteorder", "byteorder",
"crossbeam-channel", "crossbeam-channel",
"flate2", "flate2",
"nom 7.1.3", "nom",
"num-traits", "num-traits",
] ]
@ -2643,9 +2619,9 @@ dependencies = [
[[package]] [[package]]
name = "influxdb2" name = "influxdb2"
version = "0.3.5" version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9adea4aa306353d8cdc2920bf9206af2c37275fe51835ab61e06fa3c5fbf14e" checksum = "c239bb83aa8f411697335c63a608fe46f75a6f68898abc3cbcd813ae1f6fb329"
dependencies = [ dependencies = [
"base64 0.13.1", "base64 0.13.1",
"bytes", "bytes",
@ -2657,7 +2633,7 @@ dependencies = [
"go-parse-duration", "go-parse-duration",
"influxdb2-derive", "influxdb2-derive",
"influxdb2-structmap", "influxdb2-structmap",
"nom 6.1.2", "nom",
"opentelemetry", "opentelemetry",
"ordered-float", "ordered-float",
"parking_lot 0.11.2", "parking_lot 0.11.2",
@ -2675,9 +2651,9 @@ dependencies = [
[[package]] [[package]]
name = "influxdb2-derive" name = "influxdb2-derive"
version = "0.1.0" version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1e007e3c8368af353f58831a0fdb1b6649df4a8f0a33aa6455fc69a896bbc30" checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16"
dependencies = [ dependencies = [
"itertools", "itertools",
"proc-macro2", "proc-macro2",
@ -2862,19 +2838,6 @@ dependencies = [
"spin 0.5.2", "spin 0.5.2",
] ]
[[package]]
name = "lexical-core"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
dependencies = [
"arrayvec 0.5.2",
"bitflags",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.139" version = "0.2.139"
@ -3086,19 +3049,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54"
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec 0.19.6",
"funty 1.1.0",
"lexical-core",
"memchr",
"version_check",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@ -3288,7 +3238,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "786393f80485445794f6043fd3138854dd109cc6c4bd1a6383db304c9ce9b9ce" checksum = "786393f80485445794f6043fd3138854dd109cc6c4bd1a6383db304c9ce9b9ce"
dependencies = [ dependencies = [
"arrayvec 0.7.2", "arrayvec",
"auto_impl 1.0.1", "auto_impl 1.0.1",
"bytes", "bytes",
"ethereum-types", "ethereum-types",
@ -3439,7 +3389,7 @@ version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac" checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac"
dependencies = [ dependencies = [
"arrayvec 0.7.2", "arrayvec",
"bitvec 1.0.1", "bitvec 1.0.1",
"byte-slice-cast", "byte-slice-cast",
"impl-trait-for-tuples", "impl-trait-for-tuples",
@ -3919,12 +3869,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac" checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac"
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]] [[package]]
name = "radium" name = "radium"
version = "0.7.0" version = "0.7.0"
@ -4302,7 +4246,7 @@ version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13cf35f7140155d02ba4ec3294373d513a3c7baa8364c162b030e33c61520a8" checksum = "e13cf35f7140155d02ba4ec3294373d513a3c7baa8364c162b030e33c61520a8"
dependencies = [ dependencies = [
"arrayvec 0.7.2", "arrayvec",
"borsh", "borsh",
"bytecheck", "bytecheck",
"byteorder", "byteorder",
@ -4882,7 +4826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f" checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f"
dependencies = [ dependencies = [
"heapless", "heapless",
"nom 7.1.3", "nom",
"serde", "serde",
"serde_plain", "serde_plain",
"thiserror", "thiserror",
@ -5154,7 +5098,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
dependencies = [ dependencies = [
"itertools", "itertools",
"nom 7.1.3", "nom",
"unicode_categories", "unicode_categories",
] ]
@ -6499,12 +6443,6 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]] [[package]]
name = "wyz" name = "wyz"
version = "0.5.1" version = "0.5.1"

View File

@ -19,30 +19,33 @@ use hashbrown::HashMap;
use influxdb2::models::Query; use influxdb2::models::Query;
use influxdb2::FromDataPoint; use influxdb2::FromDataPoint;
use itertools::Itertools; use itertools::Itertools;
use log::info; use log::{info, warn};
use serde::Serialize; use serde::Serialize;
use serde_json::{json}; use serde_json::{json, Number, Value};
use entities::{rpc_accounting, rpc_key}; use entities::{rpc_accounting, rpc_key};
use crate::http_params::get_stats_column_from_params; use crate::http_params::get_stats_column_from_params;
// This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here
// https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98
// TODO: Run rustformat on it to see what the compiled produces for this
#[derive(Debug, Default, FromDataPoint, Serialize)] #[derive(Debug, Default, FromDataPoint, Serialize)]
pub struct AggregatedRpcAccounting { pub struct AggregatedRpcAccounting {
chain_id: u64, chain_id: String,
field: String, _field: String,
value: f64, _value: i64,
time: DateTime<FixedOffset>, _time: DateTime<FixedOffset>,
error_response: bool, error_response: String,
archive_needed: bool, archive_needed: String,
} }
#[derive(Debug, Default, FromDataPoint, Serialize)] #[derive(Debug, Default, FromDataPoint, Serialize)]
pub struct DetailedRpcAccounting { pub struct DetailedRpcAccounting {
chain_id: u64, chain_id: String,
field: String, _field: String,
value: f64, _value: i64,
time: DateTime<FixedOffset>, _time: DateTime<FixedOffset>,
error_response: bool, error_response: String,
archive_needed: bool, archive_needed: String,
method: String, method: String,
} }
@ -189,7 +192,7 @@ pub async fn query_user_stats<'a>(
{filter_field} {filter_field}
{filter_chain_id} {filter_chain_id}
{group} {group}
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|> group() |> group()
"#); "#);
@ -230,7 +233,7 @@ pub async fn query_user_stats<'a>(
influx_responses influx_responses
.into_iter() .into_iter()
.map(|x| { .map(|x| {
(x.time.clone(), x) (x._time.clone(), x)
}) })
.into_group_map() .into_group_map()
.into_iter() .into_iter()
@ -250,27 +253,72 @@ pub async fn query_user_stats<'a>(
for x in grouped_items { for x in grouped_items {
info!("Iterating over grouped item {:?}", x); info!("Iterating over grouped item {:?}", x);
out.insert(
f!(r#"total_{x.field}"#), let key = format!("total_{}", x._field).to_string();
// serde_json::Value::Number(serde_json::Number::from(x.value)) info!("Looking at: {:?}", key);
json!(x.value)
); // Insert it once, and then fix it
match out.get_mut(&key) {
Some (existing) => {
match existing {
Value::Number(old_value) => {
// unwrap will error when someone has too many credits ..
let old_value = old_value.as_i64().unwrap();
warn!("Old value is {:?}", old_value);
*existing = serde_json::Value::Number(Number::from(old_value + x._value));
warn!("New value is {:?}", old_value);
},
_ => {panic!("Should be nothing but a number")}
};
}
None => {
warn!("Does not exist yet! Insert new!");
out.insert(
key,
serde_json::Value::Number(Number::from(x._value))
);
}
};
if !out.contains_key("query_window_timestamp") { if !out.contains_key("query_window_timestamp") {
out.insert( out.insert(
"query_window_timestamp".to_owned(), "query_window_timestamp".to_owned(),
// serde_json::Value::Number(x.time.timestamp().into()) // serde_json::Value::Number(x.time.timestamp().into())
json!(x.time.timestamp()) json!(x._time.timestamp())
); );
} }
// Interpret archive needed as a boolean
let archive_needed = match x.archive_needed.as_str() {
"true" => {
true
},
"false" => {
false
},
_ => {
panic!("This should never be!")
}
};
let error_response = match x.error_response.as_str() {
"true" => {
true
},
"false" => {
false
},
_ => {
panic!("This should never be!")
}
};
// Add up to archive requests and error responses // Add up to archive requests and error responses
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
if x.field == "frontend_requests" && x.archive_needed { if x._field == "frontend_requests" && archive_needed {
archive_requests += x.value as i32 // This is the number of requests archive_requests += x._value as u64 // This is the number of requests
} }
if x.field == "frontend_requests" && x.error_response { if x._field == "frontend_requests" && error_response {
error_responses += x.value as i32 error_responses += x._value as u64
} }
} }
@ -294,7 +342,7 @@ pub async fn query_user_stats<'a>(
influx_responses influx_responses
.into_iter() .into_iter()
.map(|x| { .map(|x| {
((x.time.clone(), x.method.clone()), x) ((x._time.clone(), x.method.clone()), x)
}) })
.into_group_map() .into_group_map()
.into_iter() .into_iter()
@ -313,27 +361,72 @@ pub async fn query_user_stats<'a>(
for x in grouped_items { for x in grouped_items {
info!("Iterating over grouped item {:?}", x); info!("Iterating over grouped item {:?}", x);
out.insert(
f!(r#"total_{x.field}"#), let key = format!("total_{}", x._field).to_string();
// serde_json::Value::Number(serde_json::Number::from(x.value)) info!("Looking at: {:?}", key);
json!(x.value)
); // Insert it once, and then fix it
match out.get_mut(&key) {
Some (existing) => {
match existing {
Value::Number(old_value) => {
// unwrap will error when someone has too many credits ..
let old_value = old_value.as_i64().unwrap();
warn!("Old value is {:?}", old_value);
*existing = serde_json::Value::Number(Number::from(old_value + x._value));
warn!("New value is {:?}", old_value);
},
_ => {panic!("Should be nothing but a number")}
};
}
None => {
warn!("Does not exist yet! Insert new!");
out.insert(
key,
serde_json::Value::Number(Number::from(x._value))
);
}
};
if !out.contains_key("query_window_timestamp") { if !out.contains_key("query_window_timestamp") {
out.insert( out.insert(
"query_window_timestamp".to_owned(), "query_window_timestamp".to_owned(),
// serde_json::Value::Number(x.time.timestamp().into()) // serde_json::Value::Number(x.time.timestamp().into())
json!(x.time.timestamp()) json!(x._time.timestamp())
); );
} }
// Interpret archive needed as a boolean
let archive_needed = match x.archive_needed.as_str() {
"true" => {
true
},
"false" => {
false
},
_ => {
panic!("This should never be!")
}
};
let error_response = match x.error_response.as_str() {
"true" => {
true
},
"false" => {
false
},
_ => {
panic!("This should never be!")
}
};
// Add up to archive requests and error responses // Add up to archive requests and error responses
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
if x.field == "frontend_requests" && x.archive_needed { if x._field == "frontend_requests" && archive_needed {
archive_requests += x.value as i32 // This is the number of requests archive_requests += x._value as i32 // This is the number of requests
} }
if x.field == "frontend_requests" && x.error_response { if x._field == "frontend_requests" && error_response {
error_responses += x.value as i32 error_responses += x._value as i32
} }
} }