From 0a6ccf28b52f27f039f59a0bd11912bac5601b76 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Mon, 20 Mar 2023 12:22:30 +0100 Subject: [PATCH] will continue with migration --- Cargo.lock | 90 ++---------- web3_proxy/src/stats/influxdb_queries.rs | 167 ++++++++++++++++++----- 2 files changed, 144 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e90972a..c7f4d6dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,12 +142,6 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64cb94155d965e3d37ffbbe7cc5b82c3dd79dd33bd48e536f73d2cfb8d85506f" -[[package]] -name = "arrayvec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" - [[package]] name = "arrayvec" version = "0.7.2" @@ -504,28 +498,16 @@ dependencies = [ "radium 0.3.0", ] -[[package]] -name = "bitvec" -version = "0.19.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33" -dependencies = [ - "funty 1.1.0", - "radium 0.5.3", - "tap", - "wyz 0.2.0", -] - [[package]] name = "bitvec" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" dependencies = [ - "funty 2.0.0", + "funty", "radium 0.7.0", "tap", - "wyz 0.5.1", + "wyz", ] [[package]] @@ -1683,7 +1665,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade3e9c97727343984e1ceada4fdab11142d2ee3472d2c67027d56b1251d4f15" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "bytes", "cargo_metadata 0.15.3", "chrono", @@ -2013,12 +1995,6 @@ dependencies = [ "syn", ] -[[package]] -name = "funty" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" - [[package]] name = "funty" version = "2.0.0" @@ -2326,7 +2302,7 @@ dependencies = [ "byteorder", "crossbeam-channel", "flate2", - "nom 7.1.3", + "nom", "num-traits", ] @@ -2643,9 +2619,9 @@ dependencies = [ [[package]] name = "influxdb2" -version = "0.3.5" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9adea4aa306353d8cdc2920bf9206af2c37275fe51835ab61e06fa3c5fbf14e" +checksum = "c239bb83aa8f411697335c63a608fe46f75a6f68898abc3cbcd813ae1f6fb329" dependencies = [ "base64 0.13.1", "bytes", @@ -2657,7 +2633,7 @@ dependencies = [ "go-parse-duration", "influxdb2-derive", "influxdb2-structmap", - "nom 6.1.2", + "nom", "opentelemetry", "ordered-float", "parking_lot 0.11.2", @@ -2675,9 +2651,9 @@ dependencies = [ [[package]] name = "influxdb2-derive" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e007e3c8368af353f58831a0fdb1b6649df4a8f0a33aa6455fc69a896bbc30" +checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16" dependencies = [ "itertools", "proc-macro2", @@ -2862,19 +2838,6 @@ dependencies = [ "spin 0.5.2", ] -[[package]] -name = "lexical-core" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" -dependencies = [ - "arrayvec 0.5.2", - "bitflags", - "cfg-if", - "ryu", - "static_assertions", -] - [[package]] name = "libc" version = "0.2.139" @@ -3086,19 +3049,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" -[[package]] -name = "nom" -version = "6.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" -dependencies = [ - "bitvec 0.19.6", - "funty 1.1.0", - "lexical-core", - "memchr", - "version_check", -] - [[package]] name = "nom" version = "7.1.3" @@ -3288,7 +3238,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "786393f80485445794f6043fd3138854dd109cc6c4bd1a6383db304c9ce9b9ce" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "auto_impl 1.0.1", "bytes", "ethereum-types", @@ -3439,7 +3389,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "bitvec 1.0.1", "byte-slice-cast", "impl-trait-for-tuples", @@ -3919,12 +3869,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac" -[[package]] -name = "radium" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" - [[package]] name = "radium" version = "0.7.0" @@ -4302,7 +4246,7 @@ version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13cf35f7140155d02ba4ec3294373d513a3c7baa8364c162b030e33c61520a8" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "borsh", "bytecheck", "byteorder", @@ -4882,7 +4826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f" dependencies = [ "heapless", - "nom 7.1.3", + "nom", "serde", "serde_plain", "thiserror", @@ -5154,7 +5098,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" dependencies = [ "itertools", - "nom 7.1.3", + "nom", "unicode_categories", ] @@ -6499,12 +6443,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "wyz" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" - [[package]] name = "wyz" version = "0.5.1" diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index c6ac31de..208cd0fb 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -19,30 +19,33 @@ use hashbrown::HashMap; use influxdb2::models::Query; use influxdb2::FromDataPoint; use itertools::Itertools; -use log::info; +use log::{info, warn}; use serde::Serialize; -use serde_json::{json}; +use serde_json::{json, Number, Value}; use entities::{rpc_accounting, rpc_key}; use crate::http_params::get_stats_column_from_params; +// This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here +// https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98 +// TODO: Run rustformat on it to see what the compiled produces for this #[derive(Debug, Default, FromDataPoint, Serialize)] pub struct AggregatedRpcAccounting { - chain_id: u64, - field: String, - value: f64, - time: DateTime, - error_response: bool, - archive_needed: bool, + chain_id: String, + _field: String, + _value: i64, + _time: DateTime, + error_response: String, + archive_needed: String, } #[derive(Debug, Default, FromDataPoint, Serialize)] pub struct DetailedRpcAccounting { - chain_id: u64, - field: String, - value: f64, - time: DateTime, - error_response: bool, - archive_needed: bool, + chain_id: String, + _field: String, + _value: i64, + _time: DateTime, + error_response: String, + archive_needed: String, method: String, } @@ -189,7 +192,7 @@ pub async fn query_user_stats<'a>( {filter_field} {filter_chain_id} {group} - |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> group() "#); @@ -230,7 +233,7 @@ pub async fn query_user_stats<'a>( influx_responses .into_iter() .map(|x| { - (x.time.clone(), x) + (x._time.clone(), x) }) .into_group_map() .into_iter() @@ -250,27 +253,72 @@ pub async fn query_user_stats<'a>( for x in grouped_items { info!("Iterating over grouped item {:?}", x); - out.insert( - f!(r#"total_{x.field}"#), - // serde_json::Value::Number(serde_json::Number::from(x.value)) - json!(x.value) - ); + + let key = format!("total_{}", x._field).to_string(); + info!("Looking at: {:?}", key); + + // Insert it once, and then fix it + match out.get_mut(&key) { + Some (existing) => { + match existing { + Value::Number(old_value) => { + // unwrap will error when someone has too many credits .. + let old_value = old_value.as_i64().unwrap(); + warn!("Old value is {:?}", old_value); + *existing = serde_json::Value::Number(Number::from(old_value + x._value)); + warn!("New value is {:?}", old_value); + }, + _ => {panic!("Should be nothing but a number")} + }; + } + None => { + warn!("Does not exist yet! Insert new!"); + out.insert( + key, + serde_json::Value::Number(Number::from(x._value)) + ); + } + }; if !out.contains_key("query_window_timestamp") { out.insert( "query_window_timestamp".to_owned(), // serde_json::Value::Number(x.time.timestamp().into()) - json!(x.time.timestamp()) + json!(x._time.timestamp()) ); } + // Interpret archive needed as a boolean + let archive_needed = match x.archive_needed.as_str() { + "true" => { + true + }, + "false" => { + false + }, + _ => { + panic!("This should never be!") + } + }; + let error_response = match x.error_response.as_str() { + "true" => { + true + }, + "false" => { + false + }, + _ => { + panic!("This should never be!") + } + }; + // Add up to archive requests and error responses // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics - if x.field == "frontend_requests" && x.archive_needed { - archive_requests += x.value as i32 // This is the number of requests + if x._field == "frontend_requests" && archive_needed { + archive_requests += x._value as u64 // This is the number of requests } - if x.field == "frontend_requests" && x.error_response { - error_responses += x.value as i32 + if x._field == "frontend_requests" && error_response { + error_responses += x._value as u64 } } @@ -294,7 +342,7 @@ pub async fn query_user_stats<'a>( influx_responses .into_iter() .map(|x| { - ((x.time.clone(), x.method.clone()), x) + ((x._time.clone(), x.method.clone()), x) }) .into_group_map() .into_iter() @@ -313,27 +361,72 @@ pub async fn query_user_stats<'a>( for x in grouped_items { info!("Iterating over grouped item {:?}", x); - out.insert( - f!(r#"total_{x.field}"#), - // serde_json::Value::Number(serde_json::Number::from(x.value)) - json!(x.value) - ); + + let key = format!("total_{}", x._field).to_string(); + info!("Looking at: {:?}", key); + + // Insert it once, and then fix it + match out.get_mut(&key) { + Some (existing) => { + match existing { + Value::Number(old_value) => { + // unwrap will error when someone has too many credits .. + let old_value = old_value.as_i64().unwrap(); + warn!("Old value is {:?}", old_value); + *existing = serde_json::Value::Number(Number::from(old_value + x._value)); + warn!("New value is {:?}", old_value); + }, + _ => {panic!("Should be nothing but a number")} + }; + } + None => { + warn!("Does not exist yet! Insert new!"); + out.insert( + key, + serde_json::Value::Number(Number::from(x._value)) + ); + } + }; if !out.contains_key("query_window_timestamp") { out.insert( "query_window_timestamp".to_owned(), // serde_json::Value::Number(x.time.timestamp().into()) - json!(x.time.timestamp()) + json!(x._time.timestamp()) ); } + // Interpret archive needed as a boolean + let archive_needed = match x.archive_needed.as_str() { + "true" => { + true + }, + "false" => { + false + }, + _ => { + panic!("This should never be!") + } + }; + let error_response = match x.error_response.as_str() { + "true" => { + true + }, + "false" => { + false + }, + _ => { + panic!("This should never be!") + } + }; + // Add up to archive requests and error responses // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics - if x.field == "frontend_requests" && x.archive_needed { - archive_requests += x.value as i32 // This is the number of requests + if x._field == "frontend_requests" && archive_needed { + archive_requests += x._value as i32 // This is the number of requests } - if x.field == "frontend_requests" && x.error_response { - error_responses += x.value as i32 + if x._field == "frontend_requests" && error_response { + error_responses += x._value as i32 } }