From 600c1bafb404b00d417e0e07e92a7e4a74df14f0 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 17 Jul 2023 13:31:41 -0700 Subject: [PATCH] David/multiple proxy test (#187) * add test_multiple_proxies_stats_add_up * make a premium user using both proxies * added a couple clones, must add constraints now to run for multiple-proxies (check arithmetic) * lint and code review * fix comment * fix tests (now fails at todo) * will introduce endpoint to fetch rpc stats from mysql * added influxdb to tests, should next do asserst in stats collected by influx, and mysql for multi-proxy * created test where influx and mysql have separate data, should help with debugging * forgot to drop influx * tests pass except multi-proxy * test passes, will check out nothing broke elswhere * go back to numbers * some linting * linting * removed redundant info! * responding to PR comments * ULID as instance-hash for the tag in influx (for anti-dup) --------- Co-authored-by: yenicelik --- web3_proxy/src/app/mod.rs | 5 + web3_proxy/src/config.rs | 2 +- web3_proxy/src/frontend/mod.rs | 10 +- .../src/frontend/users/authentication.rs | 2 +- web3_proxy/src/frontend/users/stats.rs | 39 ++- web3_proxy/src/stats/influxdb_queries.rs | 18 +- web3_proxy/src/stats/mod.rs | 14 +- web3_proxy/src/stats/stat_buffer.rs | 7 +- .../src/sub_commands/migrate_stats_to_v2.rs | 3 + web3_proxy/tests/common/app.rs | 24 +- .../common/create_provider_with_rpc_key.rs | 11 + web3_proxy/tests/common/influx.rs | 184 +++++++++++ web3_proxy/tests/common/mod.rs | 3 + web3_proxy/tests/common/mysql.rs | 4 +- web3_proxy/tests/common/stats_accounting.rs | 100 ++++++ web3_proxy/tests/test_admins.rs | 2 +- web3_proxy/tests/test_multiple_proxy.rs | 298 ++++++++++++++++++ web3_proxy/tests/test_proxy.rs | 4 +- web3_proxy/tests/test_sum_credits_used.rs | 5 +- web3_proxy/tests/test_users.rs | 12 +- 20 files changed, 713 insertions(+), 34 deletions(-) create mode 100644 web3_proxy/tests/common/create_provider_with_rpc_key.rs create mode 100644 web3_proxy/tests/common/influx.rs create mode 100644 web3_proxy/tests/common/stats_accounting.rs create mode 100644 web3_proxy/tests/test_multiple_proxy.rs diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 537fe621..872214d9 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -55,6 +55,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use tracing::{error, info, trace, warn, Level}; +use ulid::Ulid; // TODO: make this customizable? // TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore @@ -319,6 +320,9 @@ impl Web3ProxyApp { .build() .into(); + // Generate the instance name (hostname + random hash) + let instance_hash = Ulid::new().to_string(); + // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users // stats can be saved in mysql, influxdb, both, or none @@ -334,6 +338,7 @@ impl Web3ProxyApp { 1, flush_stat_buffer_sender.clone(), flush_stat_buffer_receiver, + instance_hash, )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting important_background_handles.push(spawned_stat_buffer.background_handle); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5fe5a79c..564b7388 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -177,7 +177,7 @@ pub struct AppConfig { pub volatile_redis_url: Option, /// maximum size of the connection pool for the cache - /// If none, the minimum * 2 is used + /// If none, workers * 2 is used pub volatile_redis_max_connections: Option, /// influxdb host for stats diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 1e2727e0..18456d4e 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -213,15 +213,19 @@ pub async fn serve( .route("/user/revert_logs", get(users::stats::user_revert_logs_get)) .route( "/user/stats/aggregate", - get(users::stats::user_stats_aggregated_get), + get(users::stats::user_influx_stats_aggregated_get), ) .route( "/user/stats/aggregated", - get(users::stats::user_stats_aggregated_get), + get(users::stats::user_influx_stats_aggregated_get), + ) + .route( + "/user/stats/accounting", + get(users::stats::user_mysql_stats_get), ) .route( "/user/stats/detailed", - get(users::stats::user_stats_detailed_get), + get(users::stats::user_influx_stats_detailed_get), ) .route( "/user/logout", diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 1d8975f4..eb4d5955 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -50,7 +50,7 @@ pub struct PostLogin { pub referral_code: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct LoginPostResponse { pub bearer_token: UserBearerToken, pub rpc_keys: BTreeMap, diff --git a/web3_proxy/src/frontend/users/stats.rs b/web3_proxy/src/frontend/users/stats.rs index 8f55bf64..ee5fd158 100644 --- a/web3_proxy/src/frontend/users/stats.rs +++ b/web3_proxy/src/frontend/users/stats.rs @@ -5,7 +5,7 @@ use crate::globals::global_db_replica_conn; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, }; -use crate::stats::influxdb_queries::query_user_stats; +use crate::stats::influxdb_queries::query_user_influx_stats; use crate::stats::StatType; use axum::{ extract::Query, @@ -16,13 +16,14 @@ use axum::{ use axum_macros::debug_handler; use entities; use entities::sea_orm_active_enums::Role; -use entities::{revert_log, rpc_key, secondary_user}; +use entities::{revert_log, rpc_accounting_v2, rpc_key, secondary_user}; use hashbrown::HashMap; use migration::sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder}; use serde::Serialize; use serde_json::json; use std::collections::HashSet; use std::sync::Arc; +use tracing::info; /// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs. #[debug_handler] @@ -123,16 +124,42 @@ pub async fn user_revert_logs_get( /// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. #[debug_handler] -pub async fn user_stats_aggregated_get( +pub async fn user_influx_stats_aggregated_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, ) -> Web3ProxyResponse { - let response = query_user_stats(&app, bearer, ¶ms, StatType::Aggregated).await?; + let response = query_user_influx_stats(&app, bearer, ¶ms, StatType::Aggregated).await?; Ok(response) } +/// `GET /user/stats/accounting` -- Use a bearer token to get the user's revert logs. +#[debug_handler] +pub async fn user_mysql_stats_get( + Extension(app): Extension>, + TypedHeader(Authorization(bearer)): TypedHeader>, +) -> Web3ProxyResponse { + let user = app.bearer_is_authorized(bearer).await?; + let db_replica = global_db_replica_conn().await?; + + // Fetch everything from mysql, joined + let stats = rpc_key::Entity::find() + .filter(rpc_key::Column::UserId.eq(user.id)) + .find_with_related(rpc_accounting_v2::Entity) + .all(db_replica.as_ref()) + .await?; + + let stats = stats.into_iter().map(|x| x.1).flatten().collect::>(); + + let mut response = HashMap::new(); + response.insert("stats", stats); + + info!("Response is: {:?}", response); + + Ok(Json(response).into_response()) +} + /// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. /// /// If no bearer is provided, detailed stats for all users will be shown. @@ -143,12 +170,12 @@ pub async fn user_stats_aggregated_get( /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] -pub async fn user_stats_detailed_get( +pub async fn user_influx_stats_detailed_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, ) -> Web3ProxyResponse { - let response = query_user_stats(&app, bearer, ¶ms, StatType::Detailed).await?; + let response = query_user_influx_stats(&app, bearer, ¶ms, StatType::Detailed).await?; Ok(response) } diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index f801f9da..7ad0b9cf 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -23,10 +23,10 @@ use influxdb2::api::query::FluxRecord; use influxdb2::models::Query; use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use serde_json::json; -use tracing::{debug, error, trace, warn}; +use tracing::{error, info, trace, warn}; use ulid::Ulid; -pub async fn query_user_stats<'a>( +pub async fn query_user_influx_stats<'a>( app: &'a Web3ProxyApp, bearer: Option>>, params: &'a HashMap, @@ -250,7 +250,7 @@ pub async fn query_user_stats<'a>( |> filter(fn: (r) => r._measurement == "{measurement}") cumsum = base() - |> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis") + |> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_incl_free_credits_used" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis") |> group(columns: {group_keys}) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> drop(columns: ["_start", "_stop"]) @@ -290,8 +290,7 @@ pub async fn query_user_stats<'a>( )); } - // TODO: lower log level - debug!("Raw query to db is: {:#}", query); + trace!("Raw query to db is: {:#}", query); let query = Query::new(query.to_string()); trace!(?query, "influx"); @@ -431,6 +430,15 @@ pub async fn query_user_stats<'a>( error!("sum_credits_used should always be a Double!"); } } + } else if key == "sum_incl_free_credits_used" { + match value { + influxdb2_structmap::value::Value::Double(inner) => { + out.insert("total_incl_free_credits_used", json!(f64::from(inner))); + } + _ => { + error!("sum_incl_free_credits_used should always be a Double!"); + } + } } else if key == "sum_request_bytes" { match value { influxdb2_structmap::value::Value::Long(inner) => { diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index da892a48..8cfa432c 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -16,6 +16,9 @@ use axum::headers::Origin; use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; use entities::{referee, referrer, rpc_accounting_v2}; +use ethers::prelude::rand; +use ethers::prelude::rand::distributions::Alphanumeric; +use ethers::prelude::rand::Rng; use influxdb2::models::DataPoint; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ @@ -479,6 +482,7 @@ impl BufferedRpcQueryStats { measurement: &str, chain_id: u64, key: RpcQueryKey, + instance: &String, ) -> anyhow::Result { let mut builder = DataPoint::builder(measurement); @@ -488,6 +492,8 @@ impl BufferedRpcQueryStats { builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string()); } + builder = builder.tag("instance", instance); + builder = builder.tag("method", key.method); builder = builder @@ -503,11 +509,17 @@ impl BufferedRpcQueryStats { .field("sum_response_millis", self.sum_response_millis as i64) .field("sum_response_bytes", self.sum_response_bytes as i64) .field( - "sum_credits_used", + "sum_incl_free_credits_used", self.sum_credits_used .to_f64() .context("sum_credits_used is really (too) large")?, ) + .field( + "sum_credits_used", + self.paid_credits_used + .to_f64() + .context("sum_credits_used is really (too) large")?, + ) .field( "balance", self.approximate_balance_remaining diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index a2a88f6d..5af9473b 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -56,6 +56,7 @@ pub struct StatBuffer { user_balance_cache: UserBalanceCache, _flush_sender: mpsc::Sender>, + instance_hash: String, } impl StatBuffer { @@ -72,6 +73,7 @@ impl StatBuffer { tsdb_save_interval_seconds: u32, flush_sender: mpsc::Sender>, flush_receiver: mpsc::Receiver>, + instance_hash: String, ) -> anyhow::Result> { if influxdb_bucket.is_none() { influxdb_client = None; @@ -95,6 +97,7 @@ impl StatBuffer { tsdb_save_interval_seconds, user_balance_cache: user_balance_cache.unwrap(), _flush_sender: flush_sender, + instance_hash, }; // any errors inside this task will cause the application to exit @@ -323,7 +326,7 @@ impl StatBuffer { for (key, stat) in self.global_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now match stat - .build_timeseries_point("global_proxy", self.chain_id, key) + .build_timeseries_point("global_proxy", self.chain_id, key, &self.instance_hash) .await { Ok(point) => { @@ -338,7 +341,7 @@ impl StatBuffer { for (key, stat) in self.opt_in_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now match stat - .build_timeseries_point("opt_in_proxy", self.chain_id, key) + .build_timeseries_point("opt_in_proxy", self.chain_id, key, &self.instance_hash) .await { Ok(point) => { diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 016c04dd..4fed909d 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -74,6 +74,8 @@ impl MigrateStatsToV2SubCommand { let (flush_sender, flush_receiver) = mpsc::channel(1); + let instance_hash = Ulid::new().to_string(); + // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, @@ -87,6 +89,7 @@ impl MigrateStatsToV2SubCommand { 1, flush_sender, flush_receiver, + instance_hash, ) .context("Error spawning stat buffer")? .context("No stat buffer spawned. Maybe missing influx or db credentials?")?; diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index 13e10027..762dec90 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -1,4 +1,5 @@ use super::{anvil::TestAnvil, mysql::TestMysql}; +use crate::common::influx::TestInflux; use ethers::{ prelude::{Http, Provider}, types::Address, @@ -43,7 +44,11 @@ pub struct TestApp { } impl TestApp { - pub async fn spawn(anvil: &TestAnvil, db: Option<&TestMysql>) -> Self { + pub async fn spawn( + anvil: &TestAnvil, + db: Option<&TestMysql>, + influx: Option<&TestInflux>, + ) -> Self { let chain_id = anvil.instance.chain_id(); let num_workers = 2; @@ -54,12 +59,26 @@ impl TestApp { let db_url = db.map(|x| x.url.clone()); + let (influx_host, influx_org, influx_token, influx_bucket) = match influx { + None => (None, None, None, None), + Some(x) => ( + Some(x.host.clone()), + Some(x.org.clone()), + Some(x.token.clone()), + Some(x.bucket.clone()), + ), + }; + // make a test TopConfig // TODO: test influx // TODO: test redis let app_config: AppConfig = serde_json::from_value(json!({ "chain_id": chain_id, "db_url": db_url, + "influxdb_host": influx_host, + "influxdb_org": influx_org, + "influxdb_token": influx_token, + "influxdb_bucket": influx_bucket, "default_user_max_requests_per_period": Some(6_000_000), "deposit_factory_contract": Address::from_str( "4e3BC2054788De923A04936C6ADdB99A05B0Ea36", @@ -72,6 +91,8 @@ impl TestApp { })) .unwrap(); + info!("App Config is: {:?}", app_config); + let top_config = TopConfig { app: app_config, balanced_rpcs: HashMap::from([( @@ -82,6 +103,7 @@ impl TestApp { ..Default::default() }, )]), + // influxdb_client: influx.map(|x| x.client), private_rpcs: None, bundler_4337_rpcs: None, extra: Default::default(), diff --git a/web3_proxy/tests/common/create_provider_with_rpc_key.rs b/web3_proxy/tests/common/create_provider_with_rpc_key.rs new file mode 100644 index 00000000..1d815787 --- /dev/null +++ b/web3_proxy/tests/common/create_provider_with_rpc_key.rs @@ -0,0 +1,11 @@ +use ulid::Ulid; +use url::Url; +use web3_proxy::rpcs::provider::EthersHttpProvider; + +#[allow(unused)] +pub async fn create_provider_for_user(url: &Url, user_secret_key: &Ulid) -> EthersHttpProvider { + // Then generate a provider + let proxy_endpoint = format!("{}rpc/{}", url, user_secret_key); + + EthersHttpProvider::try_from(proxy_endpoint).unwrap() +} diff --git a/web3_proxy/tests/common/influx.rs b/web3_proxy/tests/common/influx.rs new file mode 100644 index 00000000..a75f7648 --- /dev/null +++ b/web3_proxy/tests/common/influx.rs @@ -0,0 +1,184 @@ +use ethers::prelude::rand::{self, distributions::Alphanumeric, Rng}; +use influxdb2::Client; +use migration::sea_orm::DatabaseConnection; +use std::process::Command as SyncCommand; +use std::time::Duration; +use tokio::{ + net::TcpStream, + process::Command as AsyncCommand, + time::{sleep, Instant}, +}; +use tracing::{info, trace, warn}; +use web3_proxy::relational_db::{connect_db, get_migrated_db}; + +/// on drop, the mysql docker container will be shut down +#[derive(Debug)] +pub struct TestInflux { + pub host: String, + pub org: String, + pub token: String, + pub bucket: String, + pub container_name: String, + pub client: Client, +} + +impl TestInflux { + #[allow(unused)] + pub async fn spawn() -> Self { + let random: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(8) + .map(char::from) + .collect(); + + let db_container_name = format!("web3-proxy-test-influx-{}", random); + + info!(%db_container_name); + + // docker run -d -p 8086:8086 \ + // --name influxdb2 \ + // -v $PWD/data:/var/lib/influxdb2 \ + // -v $PWD/config:/etc/influxdb2 \ + // -e DOCKER_INFLUXDB_INIT_MODE=setup \ + // -e DOCKER_INFLUXDB_INIT_USERNAME=root \ + // -e DOCKER_INFLUXDB_INIT_PASSWORD=secret-password \ + // -e DOCKER_INFLUXDB_INIT_ORG=my-init-org \ + // -e DOCKER_INFLUXDB_INIT_BUCKET=my-init-bucket \ + // -e DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=secret-token \ + + let username = "dev_web3_proxy"; + let password = "dev_web3_proxy"; + let org = "dev_org"; + let init_bucket = "dev_web3_proxy"; + let admin_token = "dev_web3_proxy_auth_token"; + + let cmd = AsyncCommand::new("docker") + .args([ + "run", + "--name", + &db_container_name, + "--rm", + "-d", + "-e", + "DOCKER_INFLUXDB_INIT_MODE=setup", + "-e", + &format!("DOCKER_INFLUXDB_INIT_USERNAME={}", username), + "-e", + &format!("DOCKER_INFLUXDB_INIT_PASSWORD={}", password), + "-e", + &format!("DOCKER_INFLUXDB_INIT_ORG={}", org), + "-e", + &format!("DOCKER_INFLUXDB_INIT_BUCKET={}", init_bucket), + "-e", + &format!("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN={}", admin_token), + "-p", + "0:8086", + "influxdb:2.6.1-alpine", + ]) + .output() + .await + .expect("failed to start influx"); + + // original port 18086 + info!("Creation command is: {:?}", cmd); + + // give the db a second to start + // TODO: wait until docker says it is healthy + sleep(Duration::from_secs(1)).await; + + let docker_inspect_output = AsyncCommand::new("docker") + .args(["inspect", &db_container_name]) + .output() + .await + .unwrap(); + + info!(?docker_inspect_output); + + let docker_inspect_json = String::from_utf8(docker_inspect_output.stdout).unwrap(); + + info!(%docker_inspect_json); + + let docker_inspect_json: serde_json::Value = + serde_json::from_str(&docker_inspect_json).unwrap(); + + let influx_ports = docker_inspect_json + .get(0) + .unwrap() + .get("NetworkSettings") + .unwrap() + .get("Ports") + .unwrap() + .get("8086/tcp") + .unwrap() + .get(0) + .unwrap(); + + trace!(?influx_ports); + + let influx_port: u64 = influx_ports + .get("HostPort") + .expect("unable to determine influx port") + .as_str() + .unwrap() + .parse() + .unwrap(); + + let influx_ip = influx_ports + .get("HostIp") + .and_then(|x| x.as_str()) + .expect("unable to determine influx ip"); + info!("Influx IP is: {:?}", influx_ip); + + // let host = "http://localhost:8086"; + let host = format!("http://{}:{}", influx_ip, influx_port); + + // Create the client ... + let influxdb_client = influxdb2::Client::new(host.clone(), org, admin_token); + info!("Influx client is: {:?}", influxdb_client); + + // create the db_data as soon as the url is known + // when this is dropped, the db will be stopped + let mut test_influx = Self { + host: host.to_string(), + org: org.to_string(), + token: admin_token.to_string(), + bucket: init_bucket.to_string(), + container_name: db_container_name.clone(), + client: influxdb_client, + }; + + let start = Instant::now(); + let max_wait = Duration::from_secs(5); + loop { + if start.elapsed() > max_wait { + panic!("db took too long to start"); + } + + if TcpStream::connect(format!("{}:{}", influx_ip, influx_port)) + .await + .is_ok() + { + break; + }; + + // not open wait. sleep and then try again + sleep(Duration::from_secs(1)).await; + } + + sleep(Duration::from_secs(1)).await; + + info!(?test_influx, elapsed=%start.elapsed().as_secs_f32(), "influx post is open. Migrating now..."); + + test_influx + } +} + +impl Drop for TestInflux { + fn drop(&mut self) { + info!(%self.container_name, "killing influx"); + + let _ = SyncCommand::new("docker") + .args(["kill", "-s", "9", &self.container_name]) + .output(); + } +} diff --git a/web3_proxy/tests/common/mod.rs b/web3_proxy/tests/common/mod.rs index b7c5c24a..b76b0cc3 100644 --- a/web3_proxy/tests/common/mod.rs +++ b/web3_proxy/tests/common/mod.rs @@ -3,10 +3,13 @@ pub mod admin_increases_balance; pub mod anvil; pub mod app; pub mod create_admin; +pub mod create_provider_with_rpc_key; pub mod create_user; +pub mod influx; pub mod mysql; pub mod referral; pub mod rpc_key; +pub mod stats_accounting; pub mod user_balance; pub use self::app::TestApp; diff --git a/web3_proxy/tests/common/mysql.rs b/web3_proxy/tests/common/mysql.rs index 13fe144d..ed3bc84e 100644 --- a/web3_proxy/tests/common/mysql.rs +++ b/web3_proxy/tests/common/mysql.rs @@ -17,9 +17,8 @@ pub struct TestMysql { } impl TestMysql { + #[allow(unused)] pub async fn spawn() -> Self { - // sqlite doesn't seem to work. our migrations are written for mysql - // so lets use docker to start mysql let password: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(16) @@ -66,7 +65,6 @@ impl TestMysql { // TODO: wait until docker says it is healthy sleep(Duration::from_secs(1)).await; - // TODO: why is this always empty?! let docker_inspect_output = AsyncCommand::new("docker") .args(["inspect", &db_container_name]) .output() diff --git a/web3_proxy/tests/common/stats_accounting.rs b/web3_proxy/tests/common/stats_accounting.rs new file mode 100644 index 00000000..cef67791 --- /dev/null +++ b/web3_proxy/tests/common/stats_accounting.rs @@ -0,0 +1,100 @@ +use crate::common::TestApp; +use serde_json::json; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{info, trace}; +use web3_proxy::frontend::users::authentication::LoginPostResponse; + +/// Get the user stats accounting + +/// Helper function to get the user's mysql (rpc_accounting_v2) stats +#[allow(unused)] +pub async fn user_get_mysql_stats( + x: &TestApp, + r: &reqwest::Client, + login_response: &LoginPostResponse, +) -> serde_json::Value { + let mysql_stats = format!("{}user/stats/accounting", x.proxy_provider.url()); + + let _stats_response = r + .get(mysql_stats) + .bearer_auth(login_response.bearer_token) + .send() + .await + .unwrap(); + trace!( + ?_stats_response, + "get stats for user #{}", + login_response.user.id + ); + assert_eq!(_stats_response.status(), 200); + let stats_response = _stats_response.json().await.unwrap(); + info!("stats_response: {:#}", json!(&stats_response)); + stats_response +} + +/// Helper function to get the user's balance +#[allow(unused)] +pub async fn user_get_influx_stats_detailed( + x: &TestApp, + r: &reqwest::Client, + login_response: &LoginPostResponse, +) -> serde_json::Value { + let stats_detailed = format!("{}user/stats/detailed", x.proxy_provider.url()); + + let _stats_response = r + .get(stats_detailed) + .bearer_auth(login_response.bearer_token) + .send() + .await + .unwrap(); + info!( + ?_stats_response, + "get stats for user #{}", login_response.user.id + ); + assert_eq!(_stats_response.status(), 200); + let stats_response = _stats_response.json().await.unwrap(); + info!("stats_response: {:#}", json!(&stats_response)); + stats_response +} + +#[allow(unused)] +pub async fn user_get_influx_stats_aggregated( + x: &TestApp, + r: &reqwest::Client, + login_response: &LoginPostResponse, + chain_id: u64, +) -> serde_json::Value { + let query_window_seconds = 300; + let chain_id = chain_id; + let start = SystemTime::now(); + let query_start = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() + - 1200; + let stats_aggregated = format!( + "{}user/stats/aggregate?query_window_seconds={}&chain_id={}&query_start={}", + x.proxy_provider.url(), + query_window_seconds, + chain_id, + query_start + ); + + info!("Stats aggregated request is: {:?}", stats_aggregated); + + info!("Sending queries to influx"); + let _stats_response = r + .get(stats_aggregated) + .bearer_auth(login_response.bearer_token) + .send() + .await + .unwrap(); + info!( + ?_stats_response, + "get stats for user #{}", login_response.user.id + ); + assert_eq!(_stats_response.status(), 200); + let stats_response = _stats_response.json().await.unwrap(); + info!("stats_response: {:#}", json!(&stats_response)); + stats_response +} diff --git a/web3_proxy/tests/test_admins.rs b/web3_proxy/tests/test_admins.rs index c15869f0..18cb0695 100644 --- a/web3_proxy/tests/test_admins.rs +++ b/web3_proxy/tests/test_admins.rs @@ -29,7 +29,7 @@ async fn test_admin_grant_credits() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(3)) diff --git a/web3_proxy/tests/test_multiple_proxy.rs b/web3_proxy/tests/test_multiple_proxy.rs new file mode 100644 index 00000000..7fa8f6ea --- /dev/null +++ b/web3_proxy/tests/test_multiple_proxy.rs @@ -0,0 +1,298 @@ +mod common; + +use crate::common::create_provider_with_rpc_key::create_provider_for_user; +use crate::common::influx::TestInflux; +use crate::common::rpc_key::user_get_first_rpc_key; +use crate::common::stats_accounting::{ + user_get_influx_stats_aggregated, user_get_influx_stats_detailed, user_get_mysql_stats, +}; +use crate::common::user_balance::user_get_balance; +use crate::common::{ + admin_increases_balance::admin_increase_balance, anvil::TestAnvil, + create_admin::create_user_as_admin, create_user::create_user, mysql::TestMysql, TestApp, +}; +use futures::future::{join_all, try_join_all}; +use rust_decimal::Decimal; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{info, warn}; +use web3_proxy::rpcs::blockchain::ArcBlock; + +#[cfg_attr(not(feature = "tests-needing-docker"), ignore)] +#[test_log::test(tokio::test)] +async fn test_multiple_proxies_stats_add_up() { + let chain_id = 999_001_999; + let a = TestAnvil::spawn(chain_id).await; + + let db = TestMysql::spawn().await; + + let influx = TestInflux::spawn().await; + + let r = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .unwrap(); + + // Since when do indices start with 1 + let x_0 = TestApp::spawn(&a, Some(&db), Some(&influx)).await; + let x_1 = TestApp::spawn(&a, Some(&db), Some(&influx)).await; + + // make a user and give them credits + let user_0_wallet = a.wallet(0); + let user_1_wallet = a.wallet(1); + let admin_wallet = a.wallet(2); + info!(?admin_wallet); + + // Login both users + let user_0_login = create_user(&x_0, &r, &user_0_wallet, None).await; + let user_1_login = create_user(&x_1, &r, &user_1_wallet, None).await; + let admin_login = create_user_as_admin(&x_0, &db, &r, &admin_wallet).await; + + // Load up balances + admin_increase_balance(&x_0, &r, &admin_login, &user_0_wallet, Decimal::from(1000)).await; + admin_increase_balance(&x_1, &r, &admin_login, &user_1_wallet, Decimal::from(2000)).await; + + let user_0_balance = user_get_balance(&x_0, &r, &user_0_login).await; + let user_1_balance = user_get_balance(&x_1, &r, &user_1_login).await; + + let user_0_balance_pre = user_0_balance.remaining(); + let user_1_balance_pre = user_1_balance.remaining(); + + assert_eq!(user_0_balance_pre, Decimal::from(1000)); + assert_eq!(user_1_balance_pre, Decimal::from(2000)); + + // Generate the proxies + let number_requests = 50; + let mut handles = Vec::new(); + + // Get the RPC key from the user + let user_0_secret_key = user_get_first_rpc_key(&x_0, &r, &user_0_login) + .await + .secret_key; + + let proxy_0_user_0_provider = + create_provider_for_user(x_0.proxy_provider.url(), &user_0_secret_key).await; + let proxy_1_user_0_provider = + create_provider_for_user(x_1.proxy_provider.url(), &user_0_secret_key).await; + + let proxy_0_user_0_provider = Arc::new(proxy_0_user_0_provider); + let proxy_1_user_0_provider = Arc::new(proxy_1_user_0_provider); + + warn!("Created users, generated providers"); + + info!("Proxy 1: {:?}", proxy_0_user_0_provider); + info!("Proxy 2: {:?}", proxy_1_user_0_provider); + + for _ in 0..number_requests { + // send 2 to proxy 0 user 0 + let proxy_0_user_0_provider_clone = proxy_0_user_0_provider.clone(); + handles.push(tokio::spawn(async move { + proxy_0_user_0_provider_clone + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap() + })); + + let proxy_0_user_0_provider_clone = proxy_0_user_0_provider.clone(); + handles.push(tokio::spawn(async move { + proxy_0_user_0_provider_clone + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap() + })); + + // send 1 to proxy 1 user 0 + let proxy_1_user_0_provider_clone = proxy_1_user_0_provider.clone(); + handles.push(tokio::spawn(async move { + proxy_1_user_0_provider_clone + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap() + })); + } + + try_join_all(handles).await.unwrap(); + + // Flush all stats here + // TODO: the test should maybe pause time so that stats definitely flush from our queries. + let flush_0_count = x_0.flush_stats().await.unwrap(); + let flush_1_count = x_1.flush_stats().await.unwrap(); + + // Wait a bit + sleep(Duration::from_secs(5)).await; + info!("Counts 0 are: {:?}", flush_0_count); + assert_eq!(flush_0_count.relational, 1); + assert_eq!(flush_0_count.timeseries, 2); + info!("Counts 1 are: {:?}", flush_1_count); + assert_eq!(flush_1_count.relational, 1); + assert_eq!(flush_1_count.timeseries, 2); + + // get stats now + // todo!("Need to validate all the stat accounting now"); + // Get the stats from here + let mysql_stats = user_get_mysql_stats(&x_0, &r, &user_0_login).await; + info!("mysql stats are: {:?}", mysql_stats); + + let influx_aggregate_stats = + user_get_influx_stats_aggregated(&x_0, &r, &user_0_login, chain_id).await; + info!( + "influx_aggregate_stats stats are: {:?}", + influx_aggregate_stats + ); + + // Get the balance + let user_0_balance_post = user_get_balance(&x_0, &r, &user_0_login).await; + let influx_stats = influx_aggregate_stats["result"].get(0).unwrap(); + let mysql_stats = mysql_stats["stats"].get(0).unwrap(); + + info!("Influx and mysql stats are"); + info!(?influx_stats); + info!(?mysql_stats); + + assert_eq!( + mysql_stats["error_response"], + influx_stats["error_response"] + ); + assert_eq!( + mysql_stats["archive_needed"], + influx_stats["archive_needed"] + ); + assert_eq!( + Decimal::from_str(&mysql_stats["chain_id"].to_string().replace('"', "")).unwrap(), + Decimal::from_str(&influx_stats["chain_id"].to_string().replace('"', "")).unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["no_servers"].to_string()).unwrap(), + Decimal::from_str(&influx_stats["no_servers"].to_string()).unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["cache_hits"].to_string()).unwrap(), + Decimal::from_str(&influx_stats["total_cache_hits"].to_string()).unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["cache_misses"].to_string()).unwrap(), + Decimal::from_str(&influx_stats["total_cache_misses"].to_string()).unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["frontend_requests"].to_string()).unwrap(), + Decimal::from_str(&influx_stats["total_frontend_requests"].to_string()).unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["sum_credits_used"].to_string().replace('"', "")).unwrap(), + Decimal::from_str( + &influx_stats["total_credits_used"] + .to_string() + .replace('"', "") + ) + .unwrap() + ); + assert_eq!( + Decimal::from_str( + &mysql_stats["sum_incl_free_credits_used"] + .to_string() + .replace('"', "") + ) + .unwrap(), + Decimal::from_str( + &influx_stats["total_incl_free_credits_used"] + .to_string() + .replace('"', "") + ) + .unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["sum_request_bytes"].to_string()).unwrap(), + Decimal::from_str(&influx_stats["total_request_bytes"].to_string()).unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["sum_response_bytes"].to_string()).unwrap(), + Decimal::from_str( + &influx_stats["total_response_bytes"] + .to_string() + .replace('"', "") + ) + .unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["sum_response_millis"].to_string()).unwrap(), + Decimal::from_str( + &influx_stats["total_response_millis"] + .to_string() + .replace('"', "") + ) + .unwrap() + ); + assert_eq!( + Decimal::from_str(&mysql_stats["backend_requests"].to_string()).unwrap(), + Decimal::from_str(&influx_stats["total_backend_requests"].to_string()).unwrap() + ); + + // We don't have gauges so we cant really fix this in influx. will get back to this later + // assert_eq!( + // Decimal::from(user_0_balance_post.remaining()), + // Decimal::from_str(&influx_stats["balance"].to_string()).unwrap() + // ); + + // The fields we skip for mysql + // backend_retries, id, no_servers, period_datetime, rpc_key_id + + // The fields we skip for influx + // collection, rpc_key, time, + + // let user_get_influx_stats_detailed = + // user_get_influx_stats_detailed(&x_0, &r, &user_0_login).await; + // info!( + // "user_get_influx_stats_detailed stats are: {:?}", + // user_get_influx_stats_detailed + // ); +} + +// Gotta compare stats with influx: +// "stats": [ +// { +// "archive_needed": false, +// "backend_requests": 2, +// "backend_retries": 0, +// "cache_hits": 148, +// "cache_misses": 2, +// "chain_id": 999001999, +// "error_response": false, +// "frontend_requests": 150, +// "id": 1, +// "no_servers": 0, +// "period_datetime": "2023-07-13T00:00:00Z", +// "rpc_key_id": 1, +// "sum_credits_used": "180.8000000000", +// "sum_incl_free_credits_used": "180.8000000000", +// "sum_request_bytes": 12433, +// "sum_response_bytes": 230533, +// "sum_response_millis": 194 +// } +// ] + +// influx +// "result": [ +// { +// "archive_needed": false, +// "balance": 939.6, +// "chain_id": "999001999", +// "collection": "opt-in", +// "error_response": false, +// "no_servers": 0, +// "rpc_key": "01H5E9HRZW2S73F1996KPKMYCE", +// "time": "2023-07-16 02:47:56 +00:00", +// "total_backend_requests": 1, +// "total_cache_hits": 49, +// "total_cache_misses": 1, +// "total_credits_used": 60.4, +// "total_frontend_requests": 50, +// "total_incl_free_credits_used": 60.4, +// "total_request_bytes": 4141, +// "total_response_bytes": 76841, +// "total_response_millis": 72 +// } diff --git a/web3_proxy/tests/test_proxy.rs b/web3_proxy/tests/test_proxy.rs index 3fc76a17..aede371c 100644 --- a/web3_proxy/tests/test_proxy.rs +++ b/web3_proxy/tests/test_proxy.rs @@ -16,7 +16,7 @@ async fn it_migrates_the_db() { let a = TestAnvil::spawn(31337).await; let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; // we call flush stats more to be sure it works than because we expect it to save any stats x.flush_stats().await.unwrap(); @@ -26,7 +26,7 @@ async fn it_migrates_the_db() { async fn it_starts_and_stops() { let a = TestAnvil::spawn(31337).await; - let x = TestApp::spawn(&a, None).await; + let x = TestApp::spawn(&a, None, None).await; let anvil_provider = &a.provider; let proxy_provider = &x.proxy_provider; diff --git a/web3_proxy/tests/test_sum_credits_used.rs b/web3_proxy/tests/test_sum_credits_used.rs index 665defa2..782f07a9 100644 --- a/web3_proxy/tests/test_sum_credits_used.rs +++ b/web3_proxy/tests/test_sum_credits_used.rs @@ -19,7 +19,7 @@ async fn test_sum_credits_used() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(3)) @@ -78,7 +78,8 @@ async fn test_sum_credits_used() { // flush stats let flushed = x.flush_stats().await.unwrap(); - assert_eq!(flushed.relational, 2, "relational"); + // TODO: this was 2 when we flushed stats for anon users. that was temporarily disabled. it should be turned back on once indexes are fixed + assert_eq!(flushed.relational, 1, "relational"); assert_eq!(flushed.timeseries, 0, "timeseries"); // Give user wallet $1000 diff --git a/web3_proxy/tests/test_users.rs b/web3_proxy/tests/test_users.rs index 12b085d9..46a0e99d 100644 --- a/web3_proxy/tests/test_users.rs +++ b/web3_proxy/tests/test_users.rs @@ -42,7 +42,7 @@ async fn test_log_in_and_out() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::new(); @@ -103,7 +103,7 @@ async fn test_admin_balance_increase() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) @@ -156,7 +156,7 @@ async fn test_user_balance_decreases() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) @@ -264,7 +264,7 @@ async fn test_referral_bonus_non_concurrent() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) @@ -413,7 +413,7 @@ async fn test_referral_bonus_concurrent_referrer_only() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) @@ -573,7 +573,7 @@ async fn test_referral_bonus_concurrent_referrer_and_user() { let db = TestMysql::spawn().await; - let x = TestApp::spawn(&a, Some(&db)).await; + let x = TestApp::spawn(&a, Some(&db), None).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20))