Merge branch 'stats_v3' of github.com:yenicelik/web3-proxy into Web3ProxyError
This commit is contained in:
commit
b012f18b2b
90
Cargo.lock
generated
90
Cargo.lock
generated
@ -142,12 +142,6 @@ version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64cb94155d965e3d37ffbbe7cc5b82c3dd79dd33bd48e536f73d2cfb8d85506f"
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.7.2"
|
||||
@ -504,28 +498,16 @@ dependencies = [
|
||||
"radium 0.3.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitvec"
|
||||
version = "0.19.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33"
|
||||
dependencies = [
|
||||
"funty 1.1.0",
|
||||
"radium 0.5.3",
|
||||
"tap",
|
||||
"wyz 0.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitvec"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
|
||||
dependencies = [
|
||||
"funty 2.0.0",
|
||||
"funty",
|
||||
"radium 0.7.0",
|
||||
"tap",
|
||||
"wyz 0.5.1",
|
||||
"wyz",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1683,7 +1665,7 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ade3e9c97727343984e1ceada4fdab11142d2ee3472d2c67027d56b1251d4f15"
|
||||
dependencies = [
|
||||
"arrayvec 0.7.2",
|
||||
"arrayvec",
|
||||
"bytes",
|
||||
"cargo_metadata 0.15.3",
|
||||
"chrono",
|
||||
@ -2013,12 +1995,6 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "funty"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
|
||||
|
||||
[[package]]
|
||||
name = "funty"
|
||||
version = "2.0.0"
|
||||
@ -2326,7 +2302,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
"nom 7.1.3",
|
||||
"nom",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
@ -2643,9 +2619,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "influxdb2"
|
||||
version = "0.3.5"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9adea4aa306353d8cdc2920bf9206af2c37275fe51835ab61e06fa3c5fbf14e"
|
||||
checksum = "c239bb83aa8f411697335c63a608fe46f75a6f68898abc3cbcd813ae1f6fb329"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
@ -2657,7 +2633,7 @@ dependencies = [
|
||||
"go-parse-duration",
|
||||
"influxdb2-derive",
|
||||
"influxdb2-structmap",
|
||||
"nom 6.1.2",
|
||||
"nom",
|
||||
"opentelemetry",
|
||||
"ordered-float",
|
||||
"parking_lot 0.11.2",
|
||||
@ -2675,9 +2651,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "influxdb2-derive"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1e007e3c8368af353f58831a0fdb1b6649df4a8f0a33aa6455fc69a896bbc30"
|
||||
checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16"
|
||||
dependencies = [
|
||||
"itertools",
|
||||
"proc-macro2",
|
||||
@ -2862,19 +2838,6 @@ dependencies = [
|
||||
"spin 0.5.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lexical-core"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
|
||||
dependencies = [
|
||||
"arrayvec 0.5.2",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"ryu",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.139"
|
||||
@ -3086,19 +3049,6 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "6.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
|
||||
dependencies = [
|
||||
"bitvec 0.19.6",
|
||||
"funty 1.1.0",
|
||||
"lexical-core",
|
||||
"memchr",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@ -3288,7 +3238,7 @@ version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "786393f80485445794f6043fd3138854dd109cc6c4bd1a6383db304c9ce9b9ce"
|
||||
dependencies = [
|
||||
"arrayvec 0.7.2",
|
||||
"arrayvec",
|
||||
"auto_impl 1.0.1",
|
||||
"bytes",
|
||||
"ethereum-types",
|
||||
@ -3439,7 +3389,7 @@ version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac"
|
||||
dependencies = [
|
||||
"arrayvec 0.7.2",
|
||||
"arrayvec",
|
||||
"bitvec 1.0.1",
|
||||
"byte-slice-cast",
|
||||
"impl-trait-for-tuples",
|
||||
@ -3919,12 +3869,6 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac"
|
||||
|
||||
[[package]]
|
||||
name = "radium"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
|
||||
|
||||
[[package]]
|
||||
name = "radium"
|
||||
version = "0.7.0"
|
||||
@ -4302,7 +4246,7 @@ version = "1.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e13cf35f7140155d02ba4ec3294373d513a3c7baa8364c162b030e33c61520a8"
|
||||
dependencies = [
|
||||
"arrayvec 0.7.2",
|
||||
"arrayvec",
|
||||
"borsh",
|
||||
"bytecheck",
|
||||
"byteorder",
|
||||
@ -4882,7 +4826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f"
|
||||
dependencies = [
|
||||
"heapless",
|
||||
"nom 7.1.3",
|
||||
"nom",
|
||||
"serde",
|
||||
"serde_plain",
|
||||
"thiserror",
|
||||
@ -5154,7 +5098,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
|
||||
dependencies = [
|
||||
"itertools",
|
||||
"nom 7.1.3",
|
||||
"nom",
|
||||
"unicode_categories",
|
||||
]
|
||||
|
||||
@ -6499,12 +6443,6 @@ dependencies = [
|
||||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wyz"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
|
||||
|
||||
[[package]]
|
||||
name = "wyz"
|
||||
version = "0.5.1"
|
||||
|
@ -17,6 +17,7 @@ kafka_urls = "127.0.0.1:19092"
|
||||
influxdb_host = "http://127.0.0.1:18086"
|
||||
influxdb_org = "dev_org"
|
||||
influxdb_token = "dev_web3_proxy_auth_token"
|
||||
influxdb_bucketname = "web3_proxy"
|
||||
|
||||
# thundering herd protection
|
||||
# only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit
|
||||
|
@ -31,7 +31,7 @@ services:
|
||||
DOCKER_INFLUXDB_INIT_USERNAME: dev_web3_proxy
|
||||
DOCKER_INFLUXDB_INIT_PASSWORD: dev_web3_proxy
|
||||
DOCKER_INFLUXDB_INIT_ORG: dev_org
|
||||
DOCKER_INFLUXDB_INIT_BUCKET: web3_proxy
|
||||
DOCKER_INFLUXDB_INIT_BUCKET: dev_web3_proxy
|
||||
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev_web3_proxy_auth_token
|
||||
ports:
|
||||
- 127.0.0.1:18086:8086
|
||||
|
@ -18,6 +18,7 @@ mod m20230130_124740_read_only_login_logic;
|
||||
mod m20230130_165144_prepare_admin_imitation_pre_login;
|
||||
mod m20230215_152254_admin_trail;
|
||||
mod m20230125_204810_stats_v2;
|
||||
mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@ -43,6 +44,7 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20230130_165144_prepare_admin_imitation_pre_login::Migration),
|
||||
Box::new(m20230215_152254_admin_trail::Migration),
|
||||
Box::new(m20230125_204810_stats_v2::Migration),
|
||||
Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,81 @@
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
// Add a nullable timestamp column to check if things were migrated in the rpc_accounting table ..
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(RpcAccounting::Table)
|
||||
.to_owned()
|
||||
.add_column(
|
||||
ColumnDef::new(RpcAccounting::Migrated)
|
||||
.timestamp()
|
||||
)
|
||||
.to_owned()
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(RpcAccounting::Table)
|
||||
.drop_column(RpcAccounting::Migrated)
|
||||
.to_owned()
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Learn more at https://docs.rs/sea-query#iden
|
||||
#[derive(Iden)]
|
||||
pub enum UserKeys {
|
||||
Table,
|
||||
Id,
|
||||
}
|
||||
|
||||
#[derive(Iden)]
|
||||
enum RpcAccounting {
|
||||
Table,
|
||||
Id,
|
||||
UserKeyId,
|
||||
ChainId,
|
||||
Method,
|
||||
ErrorResponse,
|
||||
PeriodDatetime,
|
||||
FrontendRequests,
|
||||
BackendRequests,
|
||||
BackendRetries,
|
||||
NoServers,
|
||||
CacheMisses,
|
||||
CacheHits,
|
||||
SumRequestBytes,
|
||||
MinRequestBytes,
|
||||
MeanRequestBytes,
|
||||
P50RequestBytes,
|
||||
P90RequestBytes,
|
||||
P99RequestBytes,
|
||||
MaxRequestBytes,
|
||||
SumResponseMillis,
|
||||
MinResponseMillis,
|
||||
MeanResponseMillis,
|
||||
P50ResponseMillis,
|
||||
P90ResponseMillis,
|
||||
P99ResponseMillis,
|
||||
MaxResponseMillis,
|
||||
SumResponseBytes,
|
||||
MinResponseBytes,
|
||||
MeanResponseBytes,
|
||||
P50ResponseBytes,
|
||||
P90ResponseBytes,
|
||||
P99ResponseBytes,
|
||||
MaxResponseBytes,
|
||||
Migrated
|
||||
}
|
||||
|
BIN
scripts/ethspam
Executable file
BIN
scripts/ethspam
Executable file
Binary file not shown.
7
scripts/generate-requests-and-stats.sh
Normal file
7
scripts/generate-requests-and-stats.sh
Normal file
@ -0,0 +1,7 @@
|
||||
# Got eth spam from here
|
||||
|
||||
# Got versus from here
|
||||
# https://github.com/INFURA/versus
|
||||
# ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that
|
||||
|
||||
./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100 http://localhost:8544
|
10
scripts/get-stats-aggregated.sh
Normal file
10
scripts/get-stats-aggregated.sh
Normal file
@ -0,0 +1,10 @@
|
||||
# Make a get request to get the stats in an aggregated fashion
|
||||
|
||||
# I dont think we need a user id ...
|
||||
|
||||
|
||||
curl -X GET \
|
||||
"http://localhost:8544/user/stats/aggregate?query_start=1678780033&query_window_seconds=1000"
|
||||
|
||||
#curl -X GET \
|
||||
#"http://localhost:8544/user/stats/detailed?query_start=1678780033&query_window_seconds=1000"
|
BIN
scripts/versus
Executable file
BIN
scripts/versus
Executable file
Binary file not shown.
@ -576,6 +576,7 @@ impl Web3ProxyApp {
|
||||
// stats can be saved in mysql, influxdb, both, or none
|
||||
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(
|
||||
top_config.app.chain_id,
|
||||
top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(),
|
||||
db_conn.clone(),
|
||||
influxdb_client.clone(),
|
||||
60,
|
||||
|
@ -9,6 +9,7 @@ mod create_key;
|
||||
mod create_user;
|
||||
mod drop_migration_lock;
|
||||
mod list_user_tier;
|
||||
mod migrate_stats_to_v2;
|
||||
mod pagerduty;
|
||||
mod popularity_contest;
|
||||
mod proxyd;
|
||||
@ -77,6 +78,7 @@ enum SubCommand {
|
||||
CreateKey(create_key::CreateKeySubCommand),
|
||||
CreateUser(create_user::CreateUserSubCommand),
|
||||
DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand),
|
||||
MigrateStatsToV2(migrate_stats_to_v2::MigrateStatsToV2),
|
||||
Pagerduty(pagerduty::PagerdutySubCommand),
|
||||
PopularityContest(popularity_contest::PopularityContestSubCommand),
|
||||
Proxyd(proxyd::ProxydSubCommand),
|
||||
@ -371,6 +373,14 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
x.main(&db_conn).await
|
||||
}
|
||||
SubCommand::MigrateStatsToV2(x) => {
|
||||
let db_url = cli_config
|
||||
.db_url
|
||||
.expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx");
|
||||
|
||||
let db_conn = get_db(db_url, 1, 1).await?;
|
||||
x.main(&db_conn).await
|
||||
}
|
||||
SubCommand::Pagerduty(x) => {
|
||||
if cli_config.sentry_url.is_none() {
|
||||
warn!("sentry_url is not set! Logs will only show in this console");
|
||||
|
122
web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs
Normal file
122
web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs
Normal file
@ -0,0 +1,122 @@
|
||||
use anyhow::Context;
|
||||
use argh::FromArgs;
|
||||
use entities::{rpc_accounting, rpc_accounting_v2, user};
|
||||
use ethers::types::Address;
|
||||
use hashbrown::HashMap;
|
||||
use log::{debug, info, warn};
|
||||
use migration::sea_orm::{
|
||||
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
|
||||
QueryFilter, QuerySelect
|
||||
};
|
||||
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
|
||||
|
||||
/// change a user's address.
|
||||
#[derive(FromArgs, PartialEq, Eq, Debug)]
|
||||
#[argh(subcommand, name = "migrate_stats_to_v2")]
|
||||
pub struct MigrateStatsToV2 {}
|
||||
|
||||
impl MigrateStatsToV2 {
|
||||
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
|
||||
|
||||
while true {
|
||||
|
||||
// (1) Load a batch of rows out of the old table until no more rows are left
|
||||
let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?;
|
||||
if old_records.len() == 0 {
|
||||
// Break out of while loop once all records have successfully been migrated ...
|
||||
warn!("All records seem to have been successfully migrated!");
|
||||
break;
|
||||
}
|
||||
|
||||
// (2) Create request metadata objects to match the old data
|
||||
let mut global_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
let mut opt_in_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
|
||||
// Iterate through all old rows, and put them into the above objects.
|
||||
for x in old_records {
|
||||
|
||||
info!("Preparing for migration: {:?}", x);
|
||||
|
||||
// // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object
|
||||
// let key = RpcQueryKey {
|
||||
// response_timestamp: x.period_datetime.timestamp(),
|
||||
// archive_needed: x.archive_needed,
|
||||
// error_response: x.error_response,
|
||||
// period_datetime: x.period_datetime.timestamp(),
|
||||
// rpc_secret_key_id: x.rpc_key_id,
|
||||
// origin: x.origin,
|
||||
// method: x.method
|
||||
// };
|
||||
//
|
||||
// // Create the corresponding BufferedRpcQueryStats object
|
||||
// let val = BufferedRpcQueryStats {
|
||||
// frontend_requests: x.frontend_requests,
|
||||
// backend_requests: x.backend_requests,
|
||||
// backend_retries: x.backend_retries,
|
||||
// no_servers: x.no_servers,
|
||||
// cache_misses: x.cache_misses,
|
||||
// cache_hits: x.cache_hits,
|
||||
// sum_request_bytes: x.sum_request_bytes,
|
||||
// sum_response_bytes: x.sum_response_bytes,
|
||||
// sum_response_millis: x.sum_response_millis
|
||||
// };
|
||||
//
|
||||
// // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of!
|
||||
// // We can generate dummy bytes of the same length though, this may work as well
|
||||
//
|
||||
// // TODO: Period datetime is also a question of what it is
|
||||
// // let response_stat = RpcQueryStats::new(
|
||||
// // x.method,
|
||||
// // authorization.clone(),
|
||||
// // request_metadata.clone(),
|
||||
// // response_bytes,
|
||||
// // x.period_datetime
|
||||
// // );
|
||||
//
|
||||
// // BufferedRpcQueryStats
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// (3) Update the batch in the old table with the current timestamp
|
||||
|
||||
// (4) Send through a channel to a stat emitter
|
||||
|
||||
|
||||
|
||||
// let old_address: Address = self.old_address.parse()?;
|
||||
// let new_address: Address = self.new_address.parse()?;
|
||||
//
|
||||
// let old_address: Vec<u8> = old_address.to_fixed_bytes().into();
|
||||
// let new_address: Vec<u8> = new_address.to_fixed_bytes().into();
|
||||
//
|
||||
// let u = user::Entity::find()
|
||||
// .filter(user::Column::Address.eq(old_address))
|
||||
// .one(db_conn)
|
||||
// .await?
|
||||
// .context("No user found with that address")?;
|
||||
//
|
||||
// debug!("initial user: {:#?}", u);
|
||||
//
|
||||
// if u.address == new_address {
|
||||
// info!("user already has this address");
|
||||
// } else {
|
||||
// let mut u = u.into_active_model();
|
||||
//
|
||||
// u.address = sea_orm::Set(new_address);
|
||||
//
|
||||
// let u = u.save(db_conn).await?;
|
||||
//
|
||||
// info!("changed user address");
|
||||
//
|
||||
// debug!("updated user: {:#?}", u);
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -178,6 +178,9 @@ pub struct AppConfig {
|
||||
/// influxdb token for stats
|
||||
pub influxdb_token: Option<String>,
|
||||
|
||||
/// influxdb bucket to use for stats
|
||||
pub influxdb_bucket: Option<String>,
|
||||
|
||||
/// unknown config options get put here
|
||||
#[serde(flatten, default = "HashMap::default")]
|
||||
pub extra: HashMap<String, serde_json::Value>,
|
||||
|
@ -219,7 +219,7 @@ pub fn get_query_window_seconds_from_params(
|
||||
params.get("query_window_seconds").map_or_else(
|
||||
|| {
|
||||
// no page in params. set default
|
||||
Ok(0)
|
||||
Ok(1)
|
||||
},
|
||||
|query_window_seconds: &String| {
|
||||
// parse the given timestamp
|
||||
@ -230,3 +230,34 @@ pub fn get_query_window_seconds_from_params(
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn get_stats_column_from_params(params: &HashMap<String, String>) -> Web3ProxyResult<&str> {
|
||||
params.get("query_stats_column").map_or_else(
|
||||
|| Ok("frontend_requests"),
|
||||
|query_stats_column: &String| {
|
||||
// Must be one of: Otherwise respond with an error ...
|
||||
match query_stats_column.as_str() {
|
||||
"frontend_requests"
|
||||
| "backend_requests"
|
||||
| "cache_hits"
|
||||
| "cache_misses"
|
||||
| "no_servers"
|
||||
| "sum_request_bytes"
|
||||
| "sum_response_bytes"
|
||||
| "sum_response_millis" => Ok(query_stats_column),
|
||||
_ => Err(Web3ProxyError::BadRequest(
|
||||
"Unable to parse query_stats_column. It must be one of: \
|
||||
frontend_requests, \
|
||||
backend_requests, \
|
||||
cache_hits, \
|
||||
cache_misses, \
|
||||
no_servers, \
|
||||
sum_request_bytes, \
|
||||
sum_response_bytes, \
|
||||
sum_response_millis"
|
||||
.to_string(),
|
||||
)),
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -22,206 +22,7 @@ use migration::{Condition, Expr, SimpleExpr};
|
||||
use redis_rate_limiter::redis;
|
||||
use redis_rate_limiter::redis::AsyncCommands;
|
||||
use serde_json::json;
|
||||
|
||||
// <<<<<<< HEAD:web3_proxy/src/user_queries.rs
|
||||
// /// get the attached address for the given bearer token.
|
||||
// /// First checks redis. Then checks the database.
|
||||
// /// 0 means all users.
|
||||
// /// This authenticates that the bearer is allowed to view this user_id's stats
|
||||
// pub async fn get_user_id_from_params(
|
||||
// redis_conn: &mut RedisConnection,
|
||||
// db_conn: &DatabaseConnection,
|
||||
// db_replica: &DatabaseReplica,
|
||||
// // this is a long type. should we strip it down?
|
||||
// bearer: Option<TypedHeader<Authorization<Bearer>>>,
|
||||
// params: &HashMap<String, String>,
|
||||
// ) -> Result<u64, FrontendErrorResponse> {
|
||||
// debug!("bearer and params are: {:?} {:?}", bearer, params);
|
||||
// match (bearer, params.get("user_id")) {
|
||||
// (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => {
|
||||
// // check for the bearer cache key
|
||||
// let user_bearer_token = UserBearerToken::try_from(bearer)?;
|
||||
//
|
||||
// let user_redis_key = user_bearer_token.redis_key();
|
||||
//
|
||||
// let mut save_to_redis = false;
|
||||
//
|
||||
// // get the user id that is attached to this bearer token
|
||||
// let bearer_user_id = match redis_conn.get::<_, u64>(&user_redis_key).await {
|
||||
// Err(_) => {
|
||||
// // TODO: inspect the redis error? if redis is down we should warn
|
||||
// // this also means redis being down will not kill our app. Everything will need a db read query though.
|
||||
//
|
||||
// let user_login = login::Entity::find()
|
||||
// .filter(login::Column::BearerToken.eq(user_bearer_token.uuid()))
|
||||
// .one(db_replica.conn())
|
||||
// .await
|
||||
// .context("database error while querying for user")?
|
||||
// .ok_or(FrontendErrorResponse::AccessDenied)?;
|
||||
//
|
||||
// // if expired, delete ALL expired logins
|
||||
// let now = Utc::now();
|
||||
// if now > user_login.expires_at {
|
||||
// // this row is expired! do not allow auth!
|
||||
// // delete ALL expired logins.
|
||||
// let delete_result = login::Entity::delete_many()
|
||||
// .filter(login::Column::ExpiresAt.lte(now))
|
||||
// .exec(db_conn)
|
||||
// .await?;
|
||||
//
|
||||
// // TODO: emit a stat? if this is high something weird might be happening
|
||||
// debug!("cleared expired logins: {:?}", delete_result);
|
||||
//
|
||||
// return Err(FrontendErrorResponse::AccessDenied);
|
||||
// }
|
||||
//
|
||||
// save_to_redis = true;
|
||||
//
|
||||
// user_login.user_id
|
||||
// }
|
||||
// Ok(x) => {
|
||||
// // TODO: push cache ttl further in the future?
|
||||
// x
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// let user_id: u64 = user_id.parse().context("Parsing user_id param")?;
|
||||
//
|
||||
// if bearer_user_id != user_id {
|
||||
// return Err(FrontendErrorResponse::AccessDenied);
|
||||
// }
|
||||
//
|
||||
// if save_to_redis {
|
||||
// // TODO: how long? we store in database for 4 weeks
|
||||
// const ONE_DAY: usize = 60 * 60 * 24;
|
||||
//
|
||||
// if let Err(err) = redis_conn
|
||||
// .set_ex::<_, _, ()>(user_redis_key, user_id, ONE_DAY)
|
||||
// .await
|
||||
// {
|
||||
// warn!("Unable to save user bearer token to redis: {}", err)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// Ok(bearer_user_id)
|
||||
// }
|
||||
// (_, None) => {
|
||||
// // they have a bearer token. we don't care about it on public pages
|
||||
// // 0 means all
|
||||
// Ok(0)
|
||||
// }
|
||||
// (None, Some(_)) => {
|
||||
// // they do not have a bearer token, but requested a specific id. block
|
||||
// // TODO: proper error code from a useful error code
|
||||
// // TODO: maybe instead of this sharp edged warn, we have a config value?
|
||||
// // TODO: check config for if we should deny or allow this
|
||||
// Err(FrontendErrorResponse::AccessDenied)
|
||||
// // // TODO: make this a flag
|
||||
// // warn!("allowing without auth during development!");
|
||||
// // Ok(x.parse()?)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /// only allow rpc_key to be set if user_id is also set.
|
||||
// /// this will keep people from reading someone else's keys.
|
||||
// /// 0 means none.
|
||||
//
|
||||
// pub fn get_rpc_key_id_from_params(
|
||||
// user_id: u64,
|
||||
// params: &HashMap<String, String>,
|
||||
// ) -> anyhow::Result<u64> {
|
||||
// if user_id > 0 {
|
||||
// params.get("rpc_key_id").map_or_else(
|
||||
// || Ok(0),
|
||||
// |c| {
|
||||
// let c = c.parse()?;
|
||||
//
|
||||
// Ok(c)
|
||||
// },
|
||||
// )
|
||||
// } else {
|
||||
// Ok(0)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pub fn get_chain_id_from_params(
|
||||
// app: &Web3ProxyApp,
|
||||
// params: &HashMap<String, String>,
|
||||
// ) -> anyhow::Result<u64> {
|
||||
// params.get("chain_id").map_or_else(
|
||||
// || Ok(app.config.chain_id),
|
||||
// |c| {
|
||||
// let c = c.parse()?;
|
||||
//
|
||||
// Ok(c)
|
||||
// },
|
||||
// )
|
||||
// }
|
||||
//
|
||||
// pub fn get_query_start_from_params(
|
||||
// params: &HashMap<String, String>,
|
||||
// ) -> anyhow::Result<chrono::NaiveDateTime> {
|
||||
// params.get("query_start").map_or_else(
|
||||
// || {
|
||||
// // no timestamp in params. set default
|
||||
// let x = chrono::Utc::now() - chrono::Duration::days(30);
|
||||
//
|
||||
// Ok(x.naive_utc())
|
||||
// },
|
||||
// |x: &String| {
|
||||
// // parse the given timestamp
|
||||
// let x = x.parse::<i64>().context("parsing timestamp query param")?;
|
||||
//
|
||||
// // TODO: error code 401
|
||||
// let x =
|
||||
// NaiveDateTime::from_timestamp_opt(x, 0).context("parsing timestamp query param")?;
|
||||
//
|
||||
// Ok(x)
|
||||
// },
|
||||
// )
|
||||
// }
|
||||
//
|
||||
// pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> {
|
||||
// params.get("page").map_or_else::<anyhow::Result<u64>, _, _>(
|
||||
// || {
|
||||
// // no page in params. set default
|
||||
// Ok(0)
|
||||
// },
|
||||
// |x: &String| {
|
||||
// // parse the given timestamp
|
||||
// // TODO: error code 401
|
||||
// let x = x.parse().context("parsing page query from params")?;
|
||||
//
|
||||
// Ok(x)
|
||||
// },
|
||||
// )
|
||||
// }
|
||||
//
|
||||
// pub fn get_query_window_seconds_from_params(
|
||||
// params: &HashMap<String, String>,
|
||||
// ) -> Result<u64, FrontendErrorResponse> {
|
||||
// params.get("query_window_seconds").map_or_else(
|
||||
// || {
|
||||
// // no page in params. set default
|
||||
// Ok(0)
|
||||
// },
|
||||
// |query_window_seconds: &String| {
|
||||
// // parse the given timestamp
|
||||
// // TODO: error code 401
|
||||
// query_window_seconds.parse::<u64>().map_err(|e| {
|
||||
// FrontendErrorResponse::StatusCode(
|
||||
// StatusCode::BAD_REQUEST,
|
||||
// "Unable to parse rpc_key_id".to_string(),
|
||||
// Some(e.into()),
|
||||
// )
|
||||
// })
|
||||
// },
|
||||
// )
|
||||
// }
|
||||
// =======
|
||||
use super::StatType;
|
||||
// >>>>>>> 77df3fa (stats v2):web3_proxy/src/stats/db_queries.rs
|
||||
|
||||
pub fn filter_query_window_seconds(
|
||||
query_window_seconds: u64,
|
||||
|
@ -1,7 +1,8 @@
|
||||
use super::StatType;
|
||||
use crate::http_params::get_stats_column_from_params;
|
||||
use crate::{
|
||||
app::Web3ProxyApp,
|
||||
frontend::errors::Web3ProxyResponse,
|
||||
frontend::errors::{Web3ProxyError, Web3ProxyResponse},
|
||||
http_params::{
|
||||
get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params,
|
||||
get_query_window_seconds_from_params, get_user_id_from_params,
|
||||
@ -14,31 +15,58 @@ use axum::{
|
||||
Json, TypedHeader,
|
||||
};
|
||||
use chrono::{DateTime, FixedOffset};
|
||||
use entities::{rpc_accounting, rpc_key};
|
||||
use fstrings::{f, format_args_f};
|
||||
use hashbrown::HashMap;
|
||||
use influxdb2::models::Query;
|
||||
use influxdb2::FromDataPoint;
|
||||
use itertools::Itertools;
|
||||
use log::{info, warn};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use serde_json::{json, Number, Value};
|
||||
|
||||
// TODO: include chain_id, method, and some other things in this struct
|
||||
// This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here
|
||||
// https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98
|
||||
// TODO: Run rustformat on it to see what the compiled produces for this
|
||||
#[derive(Debug, Default, FromDataPoint, Serialize)]
|
||||
pub struct AggregatedRpcAccounting {
|
||||
field: String,
|
||||
value: f64,
|
||||
time: DateTime<FixedOffset>,
|
||||
chain_id: String,
|
||||
_field: String,
|
||||
_value: i64,
|
||||
_time: DateTime<FixedOffset>,
|
||||
error_response: String,
|
||||
archive_needed: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, FromDataPoint, Serialize)]
|
||||
pub struct DetailedRpcAccounting {
|
||||
chain_id: String,
|
||||
_field: String,
|
||||
_value: i64,
|
||||
_time: DateTime<FixedOffset>,
|
||||
error_response: String,
|
||||
archive_needed: String,
|
||||
method: String,
|
||||
}
|
||||
|
||||
// pub struct AggregatedRpcAccountingErrors {
|
||||
// field: String,
|
||||
// time: DateTime<FixedOffset>,
|
||||
// archive_needed: f64
|
||||
// }
|
||||
|
||||
pub async fn query_user_stats<'a>(
|
||||
app: &'a Web3ProxyApp,
|
||||
bearer: Option<TypedHeader<Authorization<Bearer>>>,
|
||||
params: &'a HashMap<String, String>,
|
||||
stat_response_type: StatType,
|
||||
) -> Web3ProxyResponse {
|
||||
info!("Got this far 1");
|
||||
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
|
||||
let db_replica = app
|
||||
.db_replica()
|
||||
.context("query_user_stats needs a db replica")?;
|
||||
info!("Got this far 2");
|
||||
let mut redis_conn = app
|
||||
.redis_conn()
|
||||
.await
|
||||
@ -46,49 +74,132 @@ pub async fn query_user_stats<'a>(
|
||||
.context("query_user_stats needs a redis")?;
|
||||
|
||||
// TODO: have a getter for this. do we need a connection pool on it?
|
||||
info!("Got this far 3");
|
||||
let influxdb_client = app
|
||||
.influxdb_client
|
||||
.as_ref()
|
||||
.context("query_user_stats needs an influxdb client")?;
|
||||
|
||||
info!("Got this far 4");
|
||||
// get the user id first. if it is 0, we should use a cache on the app
|
||||
let user_id =
|
||||
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
|
||||
|
||||
info!("Got this far 5");
|
||||
let query_window_seconds = get_query_window_seconds_from_params(params)?;
|
||||
let query_start = get_query_start_from_params(params)?.timestamp();
|
||||
let query_stop = get_query_stop_from_params(params)?.timestamp();
|
||||
let chain_id = get_chain_id_from_params(app, params)?;
|
||||
let stats_column = get_stats_column_from_params(params)?;
|
||||
|
||||
// query_window_seconds must be provided, and should be not 1s (?) by default ..
|
||||
|
||||
// Return a bad request if query_start == query_stop, because then the query is empty basically
|
||||
if query_start == query_stop {
|
||||
return Err(Web3ProxyError::BadRequest(
|
||||
"Start and Stop date cannot be equal. Please specify a (different) start date."
|
||||
.to_owned(),
|
||||
));
|
||||
}
|
||||
|
||||
info!("Got this far 6");
|
||||
let measurement = if user_id == 0 {
|
||||
"global_proxy"
|
||||
} else {
|
||||
"opt_in_proxy"
|
||||
};
|
||||
|
||||
let bucket = "web3_proxy";
|
||||
// from(bucket: "dev_web3_proxy")
|
||||
// |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|
||||
// |> filter(fn: (r) => r["_measurement"] == "opt_in_proxy" or r["_measurement"] == "global_proxy")
|
||||
// |> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "sum_request_bytes")
|
||||
// |> group(columns: ["_field", "_measurement"])
|
||||
// |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|
||||
// |> yield(name: "mean")
|
||||
|
||||
let mut group_columns = vec!["_measurement", "_field"];
|
||||
// TODO: Should be taken from the config, not hardcoded ...
|
||||
// TODO: Turn into a 500 error if bucket is not found ..
|
||||
// Or just unwrap or so
|
||||
let bucket = &app
|
||||
.config
|
||||
.influxdb_bucket
|
||||
.clone()
|
||||
.context("No influxdb bucket was provided")?; // "web3_proxy";
|
||||
info!("Bucket is {:?}", bucket);
|
||||
|
||||
info!("Got this far 7");
|
||||
// , "archive_needed", "error_response"
|
||||
let mut group_columns = vec![
|
||||
"chain_id",
|
||||
"_measurement",
|
||||
"_field",
|
||||
"_measurement",
|
||||
"error_response",
|
||||
"archive_needed",
|
||||
];
|
||||
let mut filter_chain_id = "".to_string();
|
||||
|
||||
// Add to group columns the method, if we want the detailed view as well
|
||||
match stat_response_type {
|
||||
StatType::Detailed => {
|
||||
group_columns.push("method");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if chain_id == 0 {
|
||||
group_columns.push("chain_id");
|
||||
} else {
|
||||
filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#);
|
||||
}
|
||||
|
||||
info!("Got this far 8");
|
||||
let group_columns = serde_json::to_string(&json!(group_columns)).unwrap();
|
||||
|
||||
info!("Got this far 9");
|
||||
let group = match stat_response_type {
|
||||
StatType::Aggregated => f!(r#"|> group(columns: {group_columns})"#),
|
||||
StatType::Detailed => "".to_string(),
|
||||
};
|
||||
|
||||
info!("Got this far 10");
|
||||
let filter_field = match stat_response_type {
|
||||
StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#),
|
||||
StatType::Aggregated => {
|
||||
f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#)
|
||||
}
|
||||
// TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ...
|
||||
// Or maybe it shouldn't filter it ...
|
||||
StatType::Detailed => "".to_string(),
|
||||
};
|
||||
|
||||
info!(
|
||||
"Query start and stop are: {:?} {:?}",
|
||||
query_start, query_stop
|
||||
);
|
||||
info!("Query column parameters are: {:?}", stats_column);
|
||||
info!("Query measurement is: {:?}", measurement);
|
||||
info!("Filters are: {:?} {:?}", filter_field, filter_chain_id);
|
||||
info!("Group is: {:?}", group);
|
||||
info!("window seconds are: {:?}", query_window_seconds);
|
||||
|
||||
// These are taken care of probably ...
|
||||
// reg. fields, collect: backend_requests, frontend_requests, cache_hits, cache_misses, total_request_bytes, total_response_bytes, total_response_millis
|
||||
// "total_frontend_requests": "6",
|
||||
// "total_response_bytes": "235",
|
||||
// "total_response_millis": "0"
|
||||
// "total_cache_hits": "6",
|
||||
// "total_cache_misses": "0",
|
||||
|
||||
// Perhaps gotta run a second query to get all error responses
|
||||
// "total_error_responses": "0",
|
||||
// Same with archive requests
|
||||
// "archive_request": 0,
|
||||
|
||||
// Group by method if detailed, else just keep all methods as "null". i think influxdb takes care of that
|
||||
// "method": null,
|
||||
// "total_backend_retries": "0",
|
||||
|
||||
info!("Got this far 11");
|
||||
let query = f!(r#"
|
||||
from(bucket: "{bucket}")
|
||||
|> range(start: {query_start}, stop: {query_stop})
|
||||
@ -96,15 +207,274 @@ pub async fn query_user_stats<'a>(
|
||||
{filter_field}
|
||||
{filter_chain_id}
|
||||
{group}
|
||||
|> aggregateWindow(every: {query_window_seconds}, fn: mean, createEmpty: false)
|
||||
|> yield(name: "mean")
|
||||
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
||||
|> group()
|
||||
"#);
|
||||
|
||||
info!("Raw query to db is: {:?}", query);
|
||||
let query = Query::new(query.to_string());
|
||||
info!("Query to db is: {:?}", query);
|
||||
|
||||
// TODO: do not unwrap. add this error to FrontErrorResponse
|
||||
// TODO: StatType::Aggregated and StatType::Detailed might need different types
|
||||
let res: Vec<AggregatedRpcAccounting> = influxdb_client.query(Some(query)).await?;
|
||||
// let unparsed: serde_json::Value = serde_json::Value::Array(influxdb_client.query(Some(query.clone())).await?);
|
||||
// info!("Direct response is: {:?}", unparsed);
|
||||
info!("Got this far 12");
|
||||
|
||||
Ok(Json(json!(res)).into_response())
|
||||
// Return a different result based on the query
|
||||
let datapoints = match stat_response_type {
|
||||
StatType::Aggregated => {
|
||||
let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client
|
||||
.query::<AggregatedRpcAccounting>(Some(query))
|
||||
.await?;
|
||||
info!("Influx responses are {:?}", &influx_responses);
|
||||
for res in &influx_responses {
|
||||
info!("Resp is: {:?}", res);
|
||||
}
|
||||
|
||||
// let tmp = influx_responses.into_iter().group_by(|x| {x.time.timestamp()}).into_iter().collect::<Vec<_>>();
|
||||
// info!("Printing grouped item {}", tmp);
|
||||
|
||||
// Group by all fields together ..
|
||||
// let influx_responses = Vec::new();
|
||||
// let grouped_items = Vec::new();
|
||||
|
||||
// let mut grouped_items = influx_responses
|
||||
// .into_iter()
|
||||
// .map(|x| {
|
||||
// (x.time.clone(), x)
|
||||
// })
|
||||
// .into_group_map();
|
||||
// info!("Grouped items are {:?}", grouped_items);
|
||||
|
||||
influx_responses
|
||||
.into_iter()
|
||||
.map(|x| (x._time.clone(), x))
|
||||
.into_group_map()
|
||||
.into_iter()
|
||||
.map(|(group, grouped_items)| {
|
||||
info!("Group is: {:?}", group);
|
||||
|
||||
// Now put all the fields next to each other
|
||||
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
|
||||
let mut out = HashMap::new();
|
||||
// Could also add a timestamp
|
||||
|
||||
let mut archive_requests = 0;
|
||||
let mut error_responses = 0;
|
||||
|
||||
out.insert("method".to_owned(), json!("null"));
|
||||
|
||||
for x in grouped_items {
|
||||
info!("Iterating over grouped item {:?}", x);
|
||||
|
||||
let key = format!("total_{}", x._field).to_string();
|
||||
info!("Looking at: {:?}", key);
|
||||
|
||||
// Insert it once, and then fix it
|
||||
match out.get_mut(&key) {
|
||||
Some(existing) => {
|
||||
match existing {
|
||||
Value::Number(old_value) => {
|
||||
// unwrap will error when someone has too many credits ..
|
||||
let old_value = old_value.as_i64().unwrap();
|
||||
warn!("Old value is {:?}", old_value);
|
||||
*existing = serde_json::Value::Number(Number::from(
|
||||
old_value + x._value,
|
||||
));
|
||||
warn!("New value is {:?}", old_value);
|
||||
}
|
||||
_ => {
|
||||
panic!("Should be nothing but a number")
|
||||
}
|
||||
};
|
||||
}
|
||||
None => {
|
||||
warn!("Does not exist yet! Insert new!");
|
||||
out.insert(key, serde_json::Value::Number(Number::from(x._value)));
|
||||
}
|
||||
};
|
||||
|
||||
if !out.contains_key("query_window_timestamp") {
|
||||
out.insert(
|
||||
"query_window_timestamp".to_owned(),
|
||||
// serde_json::Value::Number(x.time.timestamp().into())
|
||||
json!(x._time.timestamp()),
|
||||
);
|
||||
}
|
||||
|
||||
// Interpret archive needed as a boolean
|
||||
let archive_needed = match x.archive_needed.as_str() {
|
||||
"true" => true,
|
||||
"false" => false,
|
||||
_ => {
|
||||
panic!("This should never be!")
|
||||
}
|
||||
};
|
||||
let error_response = match x.error_response.as_str() {
|
||||
"true" => true,
|
||||
"false" => false,
|
||||
_ => {
|
||||
panic!("This should never be!")
|
||||
}
|
||||
};
|
||||
|
||||
// Add up to archive requests and error responses
|
||||
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
|
||||
if x._field == "frontend_requests" && archive_needed {
|
||||
archive_requests += x._value as u64 // This is the number of requests
|
||||
}
|
||||
if x._field == "frontend_requests" && error_response {
|
||||
error_responses += x._value as u64
|
||||
}
|
||||
}
|
||||
|
||||
out.insert("archive_request".to_owned(), json!(archive_requests));
|
||||
out.insert("error_response".to_owned(), json!(error_responses));
|
||||
|
||||
json!(out)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
StatType::Detailed => {
|
||||
let influx_responses: Vec<DetailedRpcAccounting> = influxdb_client
|
||||
.query::<DetailedRpcAccounting>(Some(query))
|
||||
.await?;
|
||||
info!("Influx responses are {:?}", &influx_responses);
|
||||
for res in &influx_responses {
|
||||
info!("Resp is: {:?}", res);
|
||||
}
|
||||
|
||||
// Group by all fields together ..
|
||||
influx_responses
|
||||
.into_iter()
|
||||
.map(|x| ((x._time.clone(), x.method.clone()), x))
|
||||
.into_group_map()
|
||||
.into_iter()
|
||||
.map(|(group, grouped_items)| {
|
||||
// Now put all the fields next to each other
|
||||
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
|
||||
let mut out = HashMap::new();
|
||||
// Could also add a timestamp
|
||||
|
||||
let mut archive_requests = 0;
|
||||
let mut error_responses = 0;
|
||||
|
||||
// Should probably move this outside ... (?)
|
||||
let method = group.1;
|
||||
out.insert("method".to_owned(), json!(method));
|
||||
|
||||
for x in grouped_items {
|
||||
info!("Iterating over grouped item {:?}", x);
|
||||
|
||||
let key = format!("total_{}", x._field).to_string();
|
||||
info!("Looking at: {:?}", key);
|
||||
|
||||
// Insert it once, and then fix it
|
||||
match out.get_mut(&key) {
|
||||
Some(existing) => {
|
||||
match existing {
|
||||
Value::Number(old_value) => {
|
||||
// unwrap will error when someone has too many credits ..
|
||||
let old_value = old_value.as_i64().unwrap();
|
||||
warn!("Old value is {:?}", old_value);
|
||||
*existing = serde_json::Value::Number(Number::from(
|
||||
old_value + x._value,
|
||||
));
|
||||
warn!("New value is {:?}", old_value);
|
||||
}
|
||||
_ => {
|
||||
panic!("Should be nothing but a number")
|
||||
}
|
||||
};
|
||||
}
|
||||
None => {
|
||||
warn!("Does not exist yet! Insert new!");
|
||||
out.insert(key, serde_json::Value::Number(Number::from(x._value)));
|
||||
}
|
||||
};
|
||||
|
||||
if !out.contains_key("query_window_timestamp") {
|
||||
out.insert(
|
||||
"query_window_timestamp".to_owned(),
|
||||
// serde_json::Value::Number(x.time.timestamp().into())
|
||||
json!(x._time.timestamp()),
|
||||
);
|
||||
}
|
||||
|
||||
// Interpret archive needed as a boolean
|
||||
let archive_needed = match x.archive_needed.as_str() {
|
||||
"true" => true,
|
||||
"false" => false,
|
||||
_ => {
|
||||
panic!("This should never be!")
|
||||
}
|
||||
};
|
||||
let error_response = match x.error_response.as_str() {
|
||||
"true" => true,
|
||||
"false" => false,
|
||||
_ => {
|
||||
panic!("This should never be!")
|
||||
}
|
||||
};
|
||||
|
||||
// Add up to archive requests and error responses
|
||||
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
|
||||
if x._field == "frontend_requests" && archive_needed {
|
||||
archive_requests += x._value as i32 // This is the number of requests
|
||||
}
|
||||
if x._field == "frontend_requests" && error_response {
|
||||
error_responses += x._value as i32
|
||||
}
|
||||
}
|
||||
|
||||
out.insert("archive_request".to_owned(), json!(archive_requests));
|
||||
out.insert("error_response".to_owned(), json!(error_responses));
|
||||
|
||||
json!(out)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
};
|
||||
|
||||
// I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go.
|
||||
// Same with error responses ..
|
||||
let mut response_body = HashMap::new();
|
||||
response_body.insert(
|
||||
"num_items",
|
||||
serde_json::Value::Number(datapoints.len().into()),
|
||||
);
|
||||
response_body.insert("result", serde_json::Value::Array(datapoints));
|
||||
response_body.insert(
|
||||
"query_window_seconds",
|
||||
serde_json::Value::Number(query_window_seconds.into()),
|
||||
);
|
||||
response_body.insert("query_start", serde_json::Value::Number(query_start.into()));
|
||||
response_body.insert("chain_id", serde_json::Value::Number(chain_id.into()));
|
||||
|
||||
if user_id == 0 {
|
||||
// 0 means everyone. don't filter on user
|
||||
} else {
|
||||
// q = q.left_join(rpc_key::Entity);
|
||||
// condition = condition.add(rpc_key::Column::UserId.eq(user_id));
|
||||
response_body.insert("user_id", serde_json::Value::Number(user_id.into()));
|
||||
}
|
||||
|
||||
// Also optionally add the rpc_key_id:
|
||||
if let Some(rpc_key_id) = params.get("rpc_key_id") {
|
||||
let rpc_key_id = rpc_key_id
|
||||
.parse::<u64>()
|
||||
.map_err(|e| Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()))?;
|
||||
response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into()));
|
||||
}
|
||||
|
||||
info!("Got this far 13 {:?}", response_body);
|
||||
let response = Json(json!(response_body)).into_response();
|
||||
// Add the requests back into out
|
||||
|
||||
info!("Got this far 14 {:?}", response);
|
||||
|
||||
// TODO: Now impplement the proper response type
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
@ -30,23 +30,24 @@ pub enum StatType {
|
||||
Detailed,
|
||||
}
|
||||
|
||||
// Pub is needed for migration ... I could also write a second constructor for this if needed
|
||||
/// TODO: better name?
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RpcQueryStats {
|
||||
authorization: Arc<Authorization>,
|
||||
method: String,
|
||||
archive_request: bool,
|
||||
error_response: bool,
|
||||
request_bytes: u64,
|
||||
pub authorization: Arc<Authorization>,
|
||||
pub method: String,
|
||||
pub archive_request: bool,
|
||||
pub error_response: bool,
|
||||
pub request_bytes: u64,
|
||||
/// if backend_requests is 0, there was a cache_hit
|
||||
backend_requests: u64,
|
||||
response_bytes: u64,
|
||||
response_millis: u64,
|
||||
response_timestamp: i64,
|
||||
pub backend_requests: u64,
|
||||
pub response_bytes: u64,
|
||||
pub response_millis: u64,
|
||||
pub response_timestamp: i64,
|
||||
}
|
||||
|
||||
#[derive(Clone, From, Hash, PartialEq, Eq)]
|
||||
struct RpcQueryKey {
|
||||
pub struct RpcQueryKey {
|
||||
/// unix epoch time
|
||||
/// for the time series db, this is (close to) the time that the response was sent
|
||||
/// for the account database, this is rounded to the week
|
||||
@ -167,15 +168,15 @@ impl RpcQueryStats {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct BufferedRpcQueryStats {
|
||||
frontend_requests: u64,
|
||||
backend_requests: u64,
|
||||
backend_retries: u64,
|
||||
no_servers: u64,
|
||||
cache_misses: u64,
|
||||
cache_hits: u64,
|
||||
sum_request_bytes: u64,
|
||||
sum_response_bytes: u64,
|
||||
sum_response_millis: u64,
|
||||
pub frontend_requests: u64,
|
||||
pub backend_requests: u64,
|
||||
pub backend_retries: u64,
|
||||
pub no_servers: u64,
|
||||
pub cache_misses: u64,
|
||||
pub cache_hits: u64,
|
||||
pub sum_request_bytes: u64,
|
||||
pub sum_response_bytes: u64,
|
||||
pub sum_response_millis: u64,
|
||||
}
|
||||
|
||||
/// A stat that we aggregate and then store in a database.
|
||||
@ -363,7 +364,7 @@ impl RpcQueryStats {
|
||||
method: String,
|
||||
authorization: Arc<Authorization>,
|
||||
metadata: Arc<RequestMetadata>,
|
||||
response_bytes: usize,
|
||||
response_bytes: usize
|
||||
) -> Self {
|
||||
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
|
||||
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
|
||||
@ -389,11 +390,13 @@ impl RpcQueryStats {
|
||||
response_timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl StatBuffer {
|
||||
pub fn try_spawn(
|
||||
chain_id: u64,
|
||||
bucket: String,
|
||||
db_conn: Option<DatabaseConnection>,
|
||||
influxdb_client: Option<influxdb2::Client>,
|
||||
db_save_interval_seconds: u32,
|
||||
@ -418,7 +421,7 @@ impl StatBuffer {
|
||||
|
||||
// any errors inside this task will cause the application to exit
|
||||
let handle = tokio::spawn(async move {
|
||||
new.aggregate_and_save_loop(stat_receiver, shutdown_receiver)
|
||||
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver)
|
||||
.await
|
||||
});
|
||||
|
||||
@ -427,6 +430,7 @@ impl StatBuffer {
|
||||
|
||||
async fn aggregate_and_save_loop(
|
||||
&mut self,
|
||||
bucket: String,
|
||||
stat_receiver: flume::Receiver<AppStat>,
|
||||
mut shutdown_receiver: broadcast::Receiver<()>,
|
||||
) -> anyhow::Result<()> {
|
||||
@ -440,6 +444,9 @@ impl StatBuffer {
|
||||
let mut opt_in_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
|
||||
|
||||
// TODO: Somewhere here we should probably be updating the balance of the user
|
||||
// And also update the credits used etc. for the referred user
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
stat = stat_receiver.recv_async() => {
|
||||
@ -486,14 +493,14 @@ impl StatBuffer {
|
||||
|
||||
for (key, stat) in global_timeseries_buffer.drain() {
|
||||
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
|
||||
if let Err(err) = stat.save_timeseries("dev_web3_proxy", "global_proxy", self.chain_id, influxdb_client, key).await {
|
||||
if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "global_proxy", self.chain_id, influxdb_client, key).await {
|
||||
error!("unable to save global stat! err={:?}", err);
|
||||
};
|
||||
}
|
||||
|
||||
for (key, stat) in opt_in_timeseries_buffer.drain() {
|
||||
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
|
||||
if let Err(err) = stat.save_timeseries("dev_web3_proxy", "opt_in_proxy", self.chain_id, influxdb_client, key).await {
|
||||
if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "opt_in_proxy", self.chain_id, influxdb_client, key).await {
|
||||
error!("unable to save opt-in stat! err={:?}", err);
|
||||
};
|
||||
}
|
||||
@ -538,7 +545,7 @@ impl StatBuffer {
|
||||
for (key, stat) in global_timeseries_buffer.drain() {
|
||||
if let Err(err) = stat
|
||||
.save_timeseries(
|
||||
"dev_web3_proxy",
|
||||
&bucket,
|
||||
"global_proxy",
|
||||
self.chain_id,
|
||||
influxdb_client,
|
||||
@ -561,7 +568,7 @@ impl StatBuffer {
|
||||
for (key, stat) in opt_in_timeseries_buffer.drain() {
|
||||
if let Err(err) = stat
|
||||
.save_timeseries(
|
||||
"dev_web3_proxy",
|
||||
&bucket,
|
||||
"opt_in_proxy",
|
||||
self.chain_id,
|
||||
influxdb_client,
|
||||
|
Loading…
Reference in New Issue
Block a user