diff --git a/Cargo.lock b/Cargo.lock index 7e90972a..c7f4d6dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,12 +142,6 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64cb94155d965e3d37ffbbe7cc5b82c3dd79dd33bd48e536f73d2cfb8d85506f" -[[package]] -name = "arrayvec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" - [[package]] name = "arrayvec" version = "0.7.2" @@ -504,28 +498,16 @@ dependencies = [ "radium 0.3.0", ] -[[package]] -name = "bitvec" -version = "0.19.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33" -dependencies = [ - "funty 1.1.0", - "radium 0.5.3", - "tap", - "wyz 0.2.0", -] - [[package]] name = "bitvec" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" dependencies = [ - "funty 2.0.0", + "funty", "radium 0.7.0", "tap", - "wyz 0.5.1", + "wyz", ] [[package]] @@ -1683,7 +1665,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade3e9c97727343984e1ceada4fdab11142d2ee3472d2c67027d56b1251d4f15" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "bytes", "cargo_metadata 0.15.3", "chrono", @@ -2013,12 +1995,6 @@ dependencies = [ "syn", ] -[[package]] -name = "funty" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" - [[package]] name = "funty" version = "2.0.0" @@ -2326,7 +2302,7 @@ dependencies = [ "byteorder", "crossbeam-channel", "flate2", - "nom 7.1.3", + "nom", "num-traits", ] @@ -2643,9 +2619,9 @@ dependencies = [ [[package]] name = "influxdb2" -version = "0.3.5" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9adea4aa306353d8cdc2920bf9206af2c37275fe51835ab61e06fa3c5fbf14e" +checksum = "c239bb83aa8f411697335c63a608fe46f75a6f68898abc3cbcd813ae1f6fb329" dependencies = [ "base64 0.13.1", "bytes", @@ -2657,7 +2633,7 @@ dependencies = [ "go-parse-duration", "influxdb2-derive", "influxdb2-structmap", - "nom 6.1.2", + "nom", "opentelemetry", "ordered-float", "parking_lot 0.11.2", @@ -2675,9 +2651,9 @@ dependencies = [ [[package]] name = "influxdb2-derive" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e007e3c8368af353f58831a0fdb1b6649df4a8f0a33aa6455fc69a896bbc30" +checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16" dependencies = [ "itertools", "proc-macro2", @@ -2862,19 +2838,6 @@ dependencies = [ "spin 0.5.2", ] -[[package]] -name = "lexical-core" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" -dependencies = [ - "arrayvec 0.5.2", - "bitflags", - "cfg-if", - "ryu", - "static_assertions", -] - [[package]] name = "libc" version = "0.2.139" @@ -3086,19 +3049,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" -[[package]] -name = "nom" -version = "6.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" -dependencies = [ - "bitvec 0.19.6", - "funty 1.1.0", - "lexical-core", - "memchr", - "version_check", -] - [[package]] name = "nom" version = "7.1.3" @@ -3288,7 +3238,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "786393f80485445794f6043fd3138854dd109cc6c4bd1a6383db304c9ce9b9ce" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "auto_impl 1.0.1", "bytes", "ethereum-types", @@ -3439,7 +3389,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "bitvec 1.0.1", "byte-slice-cast", "impl-trait-for-tuples", @@ -3919,12 +3869,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac" -[[package]] -name = "radium" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" - [[package]] name = "radium" version = "0.7.0" @@ -4302,7 +4246,7 @@ version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13cf35f7140155d02ba4ec3294373d513a3c7baa8364c162b030e33c61520a8" dependencies = [ - "arrayvec 0.7.2", + "arrayvec", "borsh", "bytecheck", "byteorder", @@ -4882,7 +4826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f" dependencies = [ "heapless", - "nom 7.1.3", + "nom", "serde", "serde_plain", "thiserror", @@ -5154,7 +5098,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" dependencies = [ "itertools", - "nom 7.1.3", + "nom", "unicode_categories", ] @@ -6499,12 +6443,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "wyz" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" - [[package]] name = "wyz" version = "0.5.1" diff --git a/config/example.toml b/config/example.toml index 7c61b8f5..9058b8dd 100644 --- a/config/example.toml +++ b/config/example.toml @@ -17,6 +17,7 @@ kafka_urls = "127.0.0.1:19092" influxdb_host = "http://127.0.0.1:18086" influxdb_org = "dev_org" influxdb_token = "dev_web3_proxy_auth_token" +influxdb_bucketname = "web3_proxy" # thundering herd protection # only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit diff --git a/docker-compose.yml b/docker-compose.yml index 1f757dc9..beda587d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ services: DOCKER_INFLUXDB_INIT_USERNAME: dev_web3_proxy DOCKER_INFLUXDB_INIT_PASSWORD: dev_web3_proxy DOCKER_INFLUXDB_INIT_ORG: dev_org - DOCKER_INFLUXDB_INIT_BUCKET: web3_proxy + DOCKER_INFLUXDB_INIT_BUCKET: dev_web3_proxy DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev_web3_proxy_auth_token ports: - 127.0.0.1:18086:8086 diff --git a/migration/src/lib.rs b/migration/src/lib.rs index cd4cbff6..0ada6ac7 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -18,6 +18,7 @@ mod m20230130_124740_read_only_login_logic; mod m20230130_165144_prepare_admin_imitation_pre_login; mod m20230215_152254_admin_trail; mod m20230125_204810_stats_v2; +mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2; pub struct Migrator; @@ -43,6 +44,7 @@ impl MigratorTrait for Migrator { Box::new(m20230130_165144_prepare_admin_imitation_pre_login::Migration), Box::new(m20230215_152254_admin_trail::Migration), Box::new(m20230125_204810_stats_v2::Migration), + Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration), ] } } diff --git a/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs b/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs new file mode 100644 index 00000000..129fa4b3 --- /dev/null +++ b/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs @@ -0,0 +1,81 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Add a nullable timestamp column to check if things were migrated in the rpc_accounting table .. + manager + .alter_table( + Table::alter() + .table(RpcAccounting::Table) + .to_owned() + .add_column( + ColumnDef::new(RpcAccounting::Migrated) + .timestamp() + ) + .to_owned() + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(RpcAccounting::Table) + .drop_column(RpcAccounting::Migrated) + .to_owned() + ) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +pub enum UserKeys { + Table, + Id, +} + +#[derive(Iden)] +enum RpcAccounting { + Table, + Id, + UserKeyId, + ChainId, + Method, + ErrorResponse, + PeriodDatetime, + FrontendRequests, + BackendRequests, + BackendRetries, + NoServers, + CacheMisses, + CacheHits, + SumRequestBytes, + MinRequestBytes, + MeanRequestBytes, + P50RequestBytes, + P90RequestBytes, + P99RequestBytes, + MaxRequestBytes, + SumResponseMillis, + MinResponseMillis, + MeanResponseMillis, + P50ResponseMillis, + P90ResponseMillis, + P99ResponseMillis, + MaxResponseMillis, + SumResponseBytes, + MinResponseBytes, + MeanResponseBytes, + P50ResponseBytes, + P90ResponseBytes, + P99ResponseBytes, + MaxResponseBytes, + Migrated +} + diff --git a/scripts/ethspam b/scripts/ethspam new file mode 100755 index 00000000..3915594f Binary files /dev/null and b/scripts/ethspam differ diff --git a/scripts/generate-requests-and-stats.sh b/scripts/generate-requests-and-stats.sh new file mode 100644 index 00000000..58cdf10b --- /dev/null +++ b/scripts/generate-requests-and-stats.sh @@ -0,0 +1,7 @@ +# Got eth spam from here + +# Got versus from here +# https://github.com/INFURA/versus +# ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that + +./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100 http://localhost:8544 diff --git a/scripts/get-stats-aggregated.sh b/scripts/get-stats-aggregated.sh new file mode 100644 index 00000000..c1811988 --- /dev/null +++ b/scripts/get-stats-aggregated.sh @@ -0,0 +1,10 @@ +# Make a get request to get the stats in an aggregated fashion + +# I dont think we need a user id ... + + +curl -X GET \ +"http://localhost:8544/user/stats/aggregate?query_start=1678780033&query_window_seconds=1000" + +#curl -X GET \ +#"http://localhost:8544/user/stats/detailed?query_start=1678780033&query_window_seconds=1000" diff --git a/scripts/versus b/scripts/versus new file mode 100755 index 00000000..6aff1a70 Binary files /dev/null and b/scripts/versus differ diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6215463b..e39331c6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -576,6 +576,7 @@ impl Web3ProxyApp { // stats can be saved in mysql, influxdb, both, or none let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( top_config.app.chain_id, + top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(), db_conn.clone(), influxdb_client.clone(), 60, diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 7d1d2b5d..88b64a0e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -9,6 +9,7 @@ mod create_key; mod create_user; mod drop_migration_lock; mod list_user_tier; +mod migrate_stats_to_v2; mod pagerduty; mod popularity_contest; mod proxyd; @@ -77,6 +78,7 @@ enum SubCommand { CreateKey(create_key::CreateKeySubCommand), CreateUser(create_user::CreateUserSubCommand), DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand), + MigrateStatsToV2(migrate_stats_to_v2::MigrateStatsToV2), Pagerduty(pagerduty::PagerdutySubCommand), PopularityContest(popularity_contest::PopularityContestSubCommand), Proxyd(proxyd::ProxydSubCommand), @@ -371,6 +373,14 @@ fn main() -> anyhow::Result<()> { x.main(&db_conn).await } + SubCommand::MigrateStatsToV2(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx"); + + let db_conn = get_db(db_url, 1, 1).await?; + x.main(&db_conn).await + } SubCommand::Pagerduty(x) => { if cli_config.sentry_url.is_none() { warn!("sentry_url is not set! Logs will only show in this console"); diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs new file mode 100644 index 00000000..88d19dde --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -0,0 +1,122 @@ +use anyhow::Context; +use argh::FromArgs; +use entities::{rpc_accounting, rpc_accounting_v2, user}; +use ethers::types::Address; +use hashbrown::HashMap; +use log::{debug, info, warn}; +use migration::sea_orm::{ + self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, + QueryFilter, QuerySelect +}; +use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; + +/// change a user's address. +#[derive(FromArgs, PartialEq, Eq, Debug)] +#[argh(subcommand, name = "migrate_stats_to_v2")] +pub struct MigrateStatsToV2 {} + +impl MigrateStatsToV2 { + pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { + + while true { + + // (1) Load a batch of rows out of the old table until no more rows are left + let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?; + if old_records.len() == 0 { + // Break out of while loop once all records have successfully been migrated ... + warn!("All records seem to have been successfully migrated!"); + break; + } + + // (2) Create request metadata objects to match the old data + let mut global_timeseries_buffer = HashMap::::new(); + let mut opt_in_timeseries_buffer = HashMap::::new(); + let mut accounting_db_buffer = HashMap::::new(); + + // Iterate through all old rows, and put them into the above objects. + for x in old_records { + + info!("Preparing for migration: {:?}", x); + + // // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object + // let key = RpcQueryKey { + // response_timestamp: x.period_datetime.timestamp(), + // archive_needed: x.archive_needed, + // error_response: x.error_response, + // period_datetime: x.period_datetime.timestamp(), + // rpc_secret_key_id: x.rpc_key_id, + // origin: x.origin, + // method: x.method + // }; + // + // // Create the corresponding BufferedRpcQueryStats object + // let val = BufferedRpcQueryStats { + // frontend_requests: x.frontend_requests, + // backend_requests: x.backend_requests, + // backend_retries: x.backend_retries, + // no_servers: x.no_servers, + // cache_misses: x.cache_misses, + // cache_hits: x.cache_hits, + // sum_request_bytes: x.sum_request_bytes, + // sum_response_bytes: x.sum_response_bytes, + // sum_response_millis: x.sum_response_millis + // }; + // + // // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of! + // // We can generate dummy bytes of the same length though, this may work as well + // + // // TODO: Period datetime is also a question of what it is + // // let response_stat = RpcQueryStats::new( + // // x.method, + // // authorization.clone(), + // // request_metadata.clone(), + // // response_bytes, + // // x.period_datetime + // // ); + // + // // BufferedRpcQueryStats + } + + } + + + + + + // (3) Update the batch in the old table with the current timestamp + + // (4) Send through a channel to a stat emitter + + + + // let old_address: Address = self.old_address.parse()?; + // let new_address: Address = self.new_address.parse()?; + // + // let old_address: Vec = old_address.to_fixed_bytes().into(); + // let new_address: Vec = new_address.to_fixed_bytes().into(); + // + // let u = user::Entity::find() + // .filter(user::Column::Address.eq(old_address)) + // .one(db_conn) + // .await? + // .context("No user found with that address")?; + // + // debug!("initial user: {:#?}", u); + // + // if u.address == new_address { + // info!("user already has this address"); + // } else { + // let mut u = u.into_active_model(); + // + // u.address = sea_orm::Set(new_address); + // + // let u = u.save(db_conn).await?; + // + // info!("changed user address"); + // + // debug!("updated user: {:#?}", u); + // } + + Ok(()) + } +} diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index f9010902..1ba1b298 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -178,6 +178,9 @@ pub struct AppConfig { /// influxdb token for stats pub influxdb_token: Option, + /// influxdb bucket to use for stats + pub influxdb_bucket: Option, + /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 1ea7afee..754857cf 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -219,7 +219,7 @@ pub fn get_query_window_seconds_from_params( params.get("query_window_seconds").map_or_else( || { // no page in params. set default - Ok(0) + Ok(1) }, |query_window_seconds: &String| { // parse the given timestamp @@ -230,3 +230,34 @@ pub fn get_query_window_seconds_from_params( }, ) } + +pub fn get_stats_column_from_params(params: &HashMap) -> Web3ProxyResult<&str> { + params.get("query_stats_column").map_or_else( + || Ok("frontend_requests"), + |query_stats_column: &String| { + // Must be one of: Otherwise respond with an error ... + match query_stats_column.as_str() { + "frontend_requests" + | "backend_requests" + | "cache_hits" + | "cache_misses" + | "no_servers" + | "sum_request_bytes" + | "sum_response_bytes" + | "sum_response_millis" => Ok(query_stats_column), + _ => Err(Web3ProxyError::BadRequest( + "Unable to parse query_stats_column. It must be one of: \ + frontend_requests, \ + backend_requests, \ + cache_hits, \ + cache_misses, \ + no_servers, \ + sum_request_bytes, \ + sum_response_bytes, \ + sum_response_millis" + .to_string(), + )), + } + }, + ) +} diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index 15a8808f..a5ccc2a9 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -22,206 +22,7 @@ use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::redis; use redis_rate_limiter::redis::AsyncCommands; use serde_json::json; - -// <<<<<<< HEAD:web3_proxy/src/user_queries.rs -// /// get the attached address for the given bearer token. -// /// First checks redis. Then checks the database. -// /// 0 means all users. -// /// This authenticates that the bearer is allowed to view this user_id's stats -// pub async fn get_user_id_from_params( -// redis_conn: &mut RedisConnection, -// db_conn: &DatabaseConnection, -// db_replica: &DatabaseReplica, -// // this is a long type. should we strip it down? -// bearer: Option>>, -// params: &HashMap, -// ) -> Result { -// debug!("bearer and params are: {:?} {:?}", bearer, params); -// match (bearer, params.get("user_id")) { -// (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { -// // check for the bearer cache key -// let user_bearer_token = UserBearerToken::try_from(bearer)?; -// -// let user_redis_key = user_bearer_token.redis_key(); -// -// let mut save_to_redis = false; -// -// // get the user id that is attached to this bearer token -// let bearer_user_id = match redis_conn.get::<_, u64>(&user_redis_key).await { -// Err(_) => { -// // TODO: inspect the redis error? if redis is down we should warn -// // this also means redis being down will not kill our app. Everything will need a db read query though. -// -// let user_login = login::Entity::find() -// .filter(login::Column::BearerToken.eq(user_bearer_token.uuid())) -// .one(db_replica.conn()) -// .await -// .context("database error while querying for user")? -// .ok_or(FrontendErrorResponse::AccessDenied)?; -// -// // if expired, delete ALL expired logins -// let now = Utc::now(); -// if now > user_login.expires_at { -// // this row is expired! do not allow auth! -// // delete ALL expired logins. -// let delete_result = login::Entity::delete_many() -// .filter(login::Column::ExpiresAt.lte(now)) -// .exec(db_conn) -// .await?; -// -// // TODO: emit a stat? if this is high something weird might be happening -// debug!("cleared expired logins: {:?}", delete_result); -// -// return Err(FrontendErrorResponse::AccessDenied); -// } -// -// save_to_redis = true; -// -// user_login.user_id -// } -// Ok(x) => { -// // TODO: push cache ttl further in the future? -// x -// } -// }; -// -// let user_id: u64 = user_id.parse().context("Parsing user_id param")?; -// -// if bearer_user_id != user_id { -// return Err(FrontendErrorResponse::AccessDenied); -// } -// -// if save_to_redis { -// // TODO: how long? we store in database for 4 weeks -// const ONE_DAY: usize = 60 * 60 * 24; -// -// if let Err(err) = redis_conn -// .set_ex::<_, _, ()>(user_redis_key, user_id, ONE_DAY) -// .await -// { -// warn!("Unable to save user bearer token to redis: {}", err) -// } -// } -// -// Ok(bearer_user_id) -// } -// (_, None) => { -// // they have a bearer token. we don't care about it on public pages -// // 0 means all -// Ok(0) -// } -// (None, Some(_)) => { -// // they do not have a bearer token, but requested a specific id. block -// // TODO: proper error code from a useful error code -// // TODO: maybe instead of this sharp edged warn, we have a config value? -// // TODO: check config for if we should deny or allow this -// Err(FrontendErrorResponse::AccessDenied) -// // // TODO: make this a flag -// // warn!("allowing without auth during development!"); -// // Ok(x.parse()?) -// } -// } -// } -// -// /// only allow rpc_key to be set if user_id is also set. -// /// this will keep people from reading someone else's keys. -// /// 0 means none. -// -// pub fn get_rpc_key_id_from_params( -// user_id: u64, -// params: &HashMap, -// ) -> anyhow::Result { -// if user_id > 0 { -// params.get("rpc_key_id").map_or_else( -// || Ok(0), -// |c| { -// let c = c.parse()?; -// -// Ok(c) -// }, -// ) -// } else { -// Ok(0) -// } -// } -// -// pub fn get_chain_id_from_params( -// app: &Web3ProxyApp, -// params: &HashMap, -// ) -> anyhow::Result { -// params.get("chain_id").map_or_else( -// || Ok(app.config.chain_id), -// |c| { -// let c = c.parse()?; -// -// Ok(c) -// }, -// ) -// } -// -// pub fn get_query_start_from_params( -// params: &HashMap, -// ) -> anyhow::Result { -// params.get("query_start").map_or_else( -// || { -// // no timestamp in params. set default -// let x = chrono::Utc::now() - chrono::Duration::days(30); -// -// Ok(x.naive_utc()) -// }, -// |x: &String| { -// // parse the given timestamp -// let x = x.parse::().context("parsing timestamp query param")?; -// -// // TODO: error code 401 -// let x = -// NaiveDateTime::from_timestamp_opt(x, 0).context("parsing timestamp query param")?; -// -// Ok(x) -// }, -// ) -// } -// -// pub fn get_page_from_params(params: &HashMap) -> anyhow::Result { -// params.get("page").map_or_else::, _, _>( -// || { -// // no page in params. set default -// Ok(0) -// }, -// |x: &String| { -// // parse the given timestamp -// // TODO: error code 401 -// let x = x.parse().context("parsing page query from params")?; -// -// Ok(x) -// }, -// ) -// } -// -// pub fn get_query_window_seconds_from_params( -// params: &HashMap, -// ) -> Result { -// params.get("query_window_seconds").map_or_else( -// || { -// // no page in params. set default -// Ok(0) -// }, -// |query_window_seconds: &String| { -// // parse the given timestamp -// // TODO: error code 401 -// query_window_seconds.parse::().map_err(|e| { -// FrontendErrorResponse::StatusCode( -// StatusCode::BAD_REQUEST, -// "Unable to parse rpc_key_id".to_string(), -// Some(e.into()), -// ) -// }) -// }, -// ) -// } -// ======= use super::StatType; -// >>>>>>> 77df3fa (stats v2):web3_proxy/src/stats/db_queries.rs pub fn filter_query_window_seconds( query_window_seconds: u64, diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index dc36807b..1fa267b1 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,7 +1,8 @@ use super::StatType; +use crate::http_params::get_stats_column_from_params; use crate::{ app::Web3ProxyApp, - frontend::errors::Web3ProxyResponse, + frontend::errors::{Web3ProxyError, Web3ProxyResponse}, http_params::{ get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params, @@ -14,31 +15,58 @@ use axum::{ Json, TypedHeader, }; use chrono::{DateTime, FixedOffset}; +use entities::{rpc_accounting, rpc_key}; use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::models::Query; use influxdb2::FromDataPoint; +use itertools::Itertools; +use log::{info, warn}; use serde::Serialize; -use serde_json::json; +use serde_json::{json, Number, Value}; -// TODO: include chain_id, method, and some other things in this struct +// This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here +// https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98 +// TODO: Run rustformat on it to see what the compiled produces for this #[derive(Debug, Default, FromDataPoint, Serialize)] pub struct AggregatedRpcAccounting { - field: String, - value: f64, - time: DateTime, + chain_id: String, + _field: String, + _value: i64, + _time: DateTime, + error_response: String, + archive_needed: String, } +#[derive(Debug, Default, FromDataPoint, Serialize)] +pub struct DetailedRpcAccounting { + chain_id: String, + _field: String, + _value: i64, + _time: DateTime, + error_response: String, + archive_needed: String, + method: String, +} + +// pub struct AggregatedRpcAccountingErrors { +// field: String, +// time: DateTime, +// archive_needed: f64 +// } + pub async fn query_user_stats<'a>( app: &'a Web3ProxyApp, bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, ) -> Web3ProxyResponse { + info!("Got this far 1"); let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica() .context("query_user_stats needs a db replica")?; + info!("Got this far 2"); let mut redis_conn = app .redis_conn() .await @@ -46,49 +74,132 @@ pub async fn query_user_stats<'a>( .context("query_user_stats needs a redis")?; // TODO: have a getter for this. do we need a connection pool on it? + info!("Got this far 3"); let influxdb_client = app .influxdb_client .as_ref() .context("query_user_stats needs an influxdb client")?; + info!("Got this far 4"); // get the user id first. if it is 0, we should use a cache on the app let user_id = get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; + info!("Got this far 5"); let query_window_seconds = get_query_window_seconds_from_params(params)?; let query_start = get_query_start_from_params(params)?.timestamp(); let query_stop = get_query_stop_from_params(params)?.timestamp(); let chain_id = get_chain_id_from_params(app, params)?; + let stats_column = get_stats_column_from_params(params)?; + // query_window_seconds must be provided, and should be not 1s (?) by default .. + + // Return a bad request if query_start == query_stop, because then the query is empty basically + if query_start == query_stop { + return Err(Web3ProxyError::BadRequest( + "Start and Stop date cannot be equal. Please specify a (different) start date." + .to_owned(), + )); + } + + info!("Got this far 6"); let measurement = if user_id == 0 { "global_proxy" } else { "opt_in_proxy" }; - let bucket = "web3_proxy"; + // from(bucket: "dev_web3_proxy") + // |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + // |> filter(fn: (r) => r["_measurement"] == "opt_in_proxy" or r["_measurement"] == "global_proxy") + // |> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "sum_request_bytes") + // |> group(columns: ["_field", "_measurement"]) + // |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) + // |> yield(name: "mean") - let mut group_columns = vec!["_measurement", "_field"]; + // TODO: Should be taken from the config, not hardcoded ... + // TODO: Turn into a 500 error if bucket is not found .. + // Or just unwrap or so + let bucket = &app + .config + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")?; // "web3_proxy"; + info!("Bucket is {:?}", bucket); + + info!("Got this far 7"); + // , "archive_needed", "error_response" + let mut group_columns = vec![ + "chain_id", + "_measurement", + "_field", + "_measurement", + "error_response", + "archive_needed", + ]; let mut filter_chain_id = "".to_string(); + // Add to group columns the method, if we want the detailed view as well + match stat_response_type { + StatType::Detailed => { + group_columns.push("method"); + } + _ => {} + } + if chain_id == 0 { group_columns.push("chain_id"); } else { filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#); } + info!("Got this far 8"); let group_columns = serde_json::to_string(&json!(group_columns)).unwrap(); + info!("Got this far 9"); let group = match stat_response_type { StatType::Aggregated => f!(r#"|> group(columns: {group_columns})"#), StatType::Detailed => "".to_string(), }; + info!("Got this far 10"); let filter_field = match stat_response_type { - StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), + StatType::Aggregated => { + f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#) + } + // TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ... + // Or maybe it shouldn't filter it ... StatType::Detailed => "".to_string(), }; + info!( + "Query start and stop are: {:?} {:?}", + query_start, query_stop + ); + info!("Query column parameters are: {:?}", stats_column); + info!("Query measurement is: {:?}", measurement); + info!("Filters are: {:?} {:?}", filter_field, filter_chain_id); + info!("Group is: {:?}", group); + info!("window seconds are: {:?}", query_window_seconds); + + // These are taken care of probably ... + // reg. fields, collect: backend_requests, frontend_requests, cache_hits, cache_misses, total_request_bytes, total_response_bytes, total_response_millis + // "total_frontend_requests": "6", + // "total_response_bytes": "235", + // "total_response_millis": "0" + // "total_cache_hits": "6", + // "total_cache_misses": "0", + + // Perhaps gotta run a second query to get all error responses + // "total_error_responses": "0", + // Same with archive requests + // "archive_request": 0, + + // Group by method if detailed, else just keep all methods as "null". i think influxdb takes care of that + // "method": null, + // "total_backend_retries": "0", + + info!("Got this far 11"); let query = f!(r#" from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) @@ -96,15 +207,274 @@ pub async fn query_user_stats<'a>( {filter_field} {filter_chain_id} {group} - |> aggregateWindow(every: {query_window_seconds}, fn: mean, createEmpty: false) - |> yield(name: "mean") + |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) + |> group() "#); + info!("Raw query to db is: {:?}", query); let query = Query::new(query.to_string()); + info!("Query to db is: {:?}", query); // TODO: do not unwrap. add this error to FrontErrorResponse // TODO: StatType::Aggregated and StatType::Detailed might need different types - let res: Vec = influxdb_client.query(Some(query)).await?; + // let unparsed: serde_json::Value = serde_json::Value::Array(influxdb_client.query(Some(query.clone())).await?); + // info!("Direct response is: {:?}", unparsed); + info!("Got this far 12"); - Ok(Json(json!(res)).into_response()) + // Return a different result based on the query + let datapoints = match stat_response_type { + StatType::Aggregated => { + let influx_responses: Vec = influxdb_client + .query::(Some(query)) + .await?; + info!("Influx responses are {:?}", &influx_responses); + for res in &influx_responses { + info!("Resp is: {:?}", res); + } + + // let tmp = influx_responses.into_iter().group_by(|x| {x.time.timestamp()}).into_iter().collect::>(); + // info!("Printing grouped item {}", tmp); + + // Group by all fields together .. + // let influx_responses = Vec::new(); + // let grouped_items = Vec::new(); + + // let mut grouped_items = influx_responses + // .into_iter() + // .map(|x| { + // (x.time.clone(), x) + // }) + // .into_group_map(); + // info!("Grouped items are {:?}", grouped_items); + + influx_responses + .into_iter() + .map(|x| (x._time.clone(), x)) + .into_group_map() + .into_iter() + .map(|(group, grouped_items)| { + info!("Group is: {:?}", group); + + // Now put all the fields next to each other + // (there will be exactly one field per timestamp, but we want to arrive at a new object) + let mut out = HashMap::new(); + // Could also add a timestamp + + let mut archive_requests = 0; + let mut error_responses = 0; + + out.insert("method".to_owned(), json!("null")); + + for x in grouped_items { + info!("Iterating over grouped item {:?}", x); + + let key = format!("total_{}", x._field).to_string(); + info!("Looking at: {:?}", key); + + // Insert it once, and then fix it + match out.get_mut(&key) { + Some(existing) => { + match existing { + Value::Number(old_value) => { + // unwrap will error when someone has too many credits .. + let old_value = old_value.as_i64().unwrap(); + warn!("Old value is {:?}", old_value); + *existing = serde_json::Value::Number(Number::from( + old_value + x._value, + )); + warn!("New value is {:?}", old_value); + } + _ => { + panic!("Should be nothing but a number") + } + }; + } + None => { + warn!("Does not exist yet! Insert new!"); + out.insert(key, serde_json::Value::Number(Number::from(x._value))); + } + }; + + if !out.contains_key("query_window_timestamp") { + out.insert( + "query_window_timestamp".to_owned(), + // serde_json::Value::Number(x.time.timestamp().into()) + json!(x._time.timestamp()), + ); + } + + // Interpret archive needed as a boolean + let archive_needed = match x.archive_needed.as_str() { + "true" => true, + "false" => false, + _ => { + panic!("This should never be!") + } + }; + let error_response = match x.error_response.as_str() { + "true" => true, + "false" => false, + _ => { + panic!("This should never be!") + } + }; + + // Add up to archive requests and error responses + // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics + if x._field == "frontend_requests" && archive_needed { + archive_requests += x._value as u64 // This is the number of requests + } + if x._field == "frontend_requests" && error_response { + error_responses += x._value as u64 + } + } + + out.insert("archive_request".to_owned(), json!(archive_requests)); + out.insert("error_response".to_owned(), json!(error_responses)); + + json!(out) + }) + .collect::>() + } + StatType::Detailed => { + let influx_responses: Vec = influxdb_client + .query::(Some(query)) + .await?; + info!("Influx responses are {:?}", &influx_responses); + for res in &influx_responses { + info!("Resp is: {:?}", res); + } + + // Group by all fields together .. + influx_responses + .into_iter() + .map(|x| ((x._time.clone(), x.method.clone()), x)) + .into_group_map() + .into_iter() + .map(|(group, grouped_items)| { + // Now put all the fields next to each other + // (there will be exactly one field per timestamp, but we want to arrive at a new object) + let mut out = HashMap::new(); + // Could also add a timestamp + + let mut archive_requests = 0; + let mut error_responses = 0; + + // Should probably move this outside ... (?) + let method = group.1; + out.insert("method".to_owned(), json!(method)); + + for x in grouped_items { + info!("Iterating over grouped item {:?}", x); + + let key = format!("total_{}", x._field).to_string(); + info!("Looking at: {:?}", key); + + // Insert it once, and then fix it + match out.get_mut(&key) { + Some(existing) => { + match existing { + Value::Number(old_value) => { + // unwrap will error when someone has too many credits .. + let old_value = old_value.as_i64().unwrap(); + warn!("Old value is {:?}", old_value); + *existing = serde_json::Value::Number(Number::from( + old_value + x._value, + )); + warn!("New value is {:?}", old_value); + } + _ => { + panic!("Should be nothing but a number") + } + }; + } + None => { + warn!("Does not exist yet! Insert new!"); + out.insert(key, serde_json::Value::Number(Number::from(x._value))); + } + }; + + if !out.contains_key("query_window_timestamp") { + out.insert( + "query_window_timestamp".to_owned(), + // serde_json::Value::Number(x.time.timestamp().into()) + json!(x._time.timestamp()), + ); + } + + // Interpret archive needed as a boolean + let archive_needed = match x.archive_needed.as_str() { + "true" => true, + "false" => false, + _ => { + panic!("This should never be!") + } + }; + let error_response = match x.error_response.as_str() { + "true" => true, + "false" => false, + _ => { + panic!("This should never be!") + } + }; + + // Add up to archive requests and error responses + // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics + if x._field == "frontend_requests" && archive_needed { + archive_requests += x._value as i32 // This is the number of requests + } + if x._field == "frontend_requests" && error_response { + error_responses += x._value as i32 + } + } + + out.insert("archive_request".to_owned(), json!(archive_requests)); + out.insert("error_response".to_owned(), json!(error_responses)); + + json!(out) + }) + .collect::>() + } + }; + + // I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go. + // Same with error responses .. + let mut response_body = HashMap::new(); + response_body.insert( + "num_items", + serde_json::Value::Number(datapoints.len().into()), + ); + response_body.insert("result", serde_json::Value::Array(datapoints)); + response_body.insert( + "query_window_seconds", + serde_json::Value::Number(query_window_seconds.into()), + ); + response_body.insert("query_start", serde_json::Value::Number(query_start.into())); + response_body.insert("chain_id", serde_json::Value::Number(chain_id.into())); + + if user_id == 0 { + // 0 means everyone. don't filter on user + } else { + // q = q.left_join(rpc_key::Entity); + // condition = condition.add(rpc_key::Column::UserId.eq(user_id)); + response_body.insert("user_id", serde_json::Value::Number(user_id.into())); + } + + // Also optionally add the rpc_key_id: + if let Some(rpc_key_id) = params.get("rpc_key_id") { + let rpc_key_id = rpc_key_id + .parse::() + .map_err(|e| Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()))?; + response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + } + + info!("Got this far 13 {:?}", response_body); + let response = Json(json!(response_body)).into_response(); + // Add the requests back into out + + info!("Got this far 14 {:?}", response); + + // TODO: Now impplement the proper response type + + Ok(response) } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index e0ad8f39..99cb77df 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -30,23 +30,24 @@ pub enum StatType { Detailed, } +// Pub is needed for migration ... I could also write a second constructor for this if needed /// TODO: better name? #[derive(Clone, Debug)] pub struct RpcQueryStats { - authorization: Arc, - method: String, - archive_request: bool, - error_response: bool, - request_bytes: u64, + pub authorization: Arc, + pub method: String, + pub archive_request: bool, + pub error_response: bool, + pub request_bytes: u64, /// if backend_requests is 0, there was a cache_hit - backend_requests: u64, - response_bytes: u64, - response_millis: u64, - response_timestamp: i64, + pub backend_requests: u64, + pub response_bytes: u64, + pub response_millis: u64, + pub response_timestamp: i64, } #[derive(Clone, From, Hash, PartialEq, Eq)] -struct RpcQueryKey { +pub struct RpcQueryKey { /// unix epoch time /// for the time series db, this is (close to) the time that the response was sent /// for the account database, this is rounded to the week @@ -167,15 +168,15 @@ impl RpcQueryStats { #[derive(Default)] pub struct BufferedRpcQueryStats { - frontend_requests: u64, - backend_requests: u64, - backend_retries: u64, - no_servers: u64, - cache_misses: u64, - cache_hits: u64, - sum_request_bytes: u64, - sum_response_bytes: u64, - sum_response_millis: u64, + pub frontend_requests: u64, + pub backend_requests: u64, + pub backend_retries: u64, + pub no_servers: u64, + pub cache_misses: u64, + pub cache_hits: u64, + pub sum_request_bytes: u64, + pub sum_response_bytes: u64, + pub sum_response_millis: u64, } /// A stat that we aggregate and then store in a database. @@ -363,7 +364,7 @@ impl RpcQueryStats { method: String, authorization: Arc, metadata: Arc, - response_bytes: usize, + response_bytes: usize ) -> Self { // TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected // TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created @@ -389,11 +390,13 @@ impl RpcQueryStats { response_timestamp, } } + } impl StatBuffer { pub fn try_spawn( chain_id: u64, + bucket: String, db_conn: Option, influxdb_client: Option, db_save_interval_seconds: u32, @@ -418,7 +421,7 @@ impl StatBuffer { // any errors inside this task will cause the application to exit let handle = tokio::spawn(async move { - new.aggregate_and_save_loop(stat_receiver, shutdown_receiver) + new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver) .await }); @@ -427,6 +430,7 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, + bucket: String, stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { @@ -440,6 +444,9 @@ impl StatBuffer { let mut opt_in_timeseries_buffer = HashMap::::new(); let mut accounting_db_buffer = HashMap::::new(); + // TODO: Somewhere here we should probably be updating the balance of the user + // And also update the credits used etc. for the referred user + loop { tokio::select! { stat = stat_receiver.recv_async() => { @@ -486,14 +493,14 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_timeseries("dev_web3_proxy", "global_proxy", self.chain_id, influxdb_client, key).await { + if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "global_proxy", self.chain_id, influxdb_client, key).await { error!("unable to save global stat! err={:?}", err); }; } for (key, stat) in opt_in_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_timeseries("dev_web3_proxy", "opt_in_proxy", self.chain_id, influxdb_client, key).await { + if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "opt_in_proxy", self.chain_id, influxdb_client, key).await { error!("unable to save opt-in stat! err={:?}", err); }; } @@ -538,7 +545,7 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { if let Err(err) = stat .save_timeseries( - "dev_web3_proxy", + &bucket, "global_proxy", self.chain_id, influxdb_client, @@ -561,7 +568,7 @@ impl StatBuffer { for (key, stat) in opt_in_timeseries_buffer.drain() { if let Err(err) = stat .save_timeseries( - "dev_web3_proxy", + &bucket, "opt_in_proxy", self.chain_id, influxdb_client,