David/multiple proxy test (#187)

* add test_multiple_proxies_stats_add_up

* make a premium user using both proxies

* added a couple clones, must add constraints now to run for multiple-proxies (check arithmetic)

* lint and code review

* fix comment

* fix tests (now fails at todo)

* will introduce endpoint to fetch rpc stats from mysql

* added influxdb to tests, should next do asserst in stats collected by influx, and mysql for multi-proxy

* created test where influx and mysql have separate data, should help with debugging

* forgot to drop influx

* tests pass except multi-proxy

* test passes, will check out nothing broke elswhere

* go back to numbers

* some linting

* linting

* removed redundant info!

* responding to PR comments

* ULID as instance-hash for the tag in influx (for anti-dup)

---------

Co-authored-by: yenicelik <david.yenicelik@gmail.com>
This commit is contained in:
Bryan Stitt 2023-07-17 13:31:41 -07:00 committed by GitHub
parent fe672b5189
commit 600c1bafb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 713 additions and 34 deletions

@ -55,6 +55,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tracing::{error, info, trace, warn, Level};
use ulid::Ulid;
// TODO: make this customizable?
// TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore
@ -319,6 +320,9 @@ impl Web3ProxyApp {
.build()
.into();
// Generate the instance name (hostname + random hash)
let instance_hash = Ulid::new().to_string();
// create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users
// stats can be saved in mysql, influxdb, both, or none
@ -334,6 +338,7 @@ impl Web3ProxyApp {
1,
flush_stat_buffer_sender.clone(),
flush_stat_buffer_receiver,
instance_hash,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle);

@ -177,7 +177,7 @@ pub struct AppConfig {
pub volatile_redis_url: Option<String>,
/// maximum size of the connection pool for the cache
/// If none, the minimum * 2 is used
/// If none, workers * 2 is used
pub volatile_redis_max_connections: Option<usize>,
/// influxdb host for stats

@ -213,15 +213,19 @@ pub async fn serve(
.route("/user/revert_logs", get(users::stats::user_revert_logs_get))
.route(
"/user/stats/aggregate",
get(users::stats::user_stats_aggregated_get),
get(users::stats::user_influx_stats_aggregated_get),
)
.route(
"/user/stats/aggregated",
get(users::stats::user_stats_aggregated_get),
get(users::stats::user_influx_stats_aggregated_get),
)
.route(
"/user/stats/accounting",
get(users::stats::user_mysql_stats_get),
)
.route(
"/user/stats/detailed",
get(users::stats::user_stats_detailed_get),
get(users::stats::user_influx_stats_detailed_get),
)
.route(
"/user/logout",

@ -50,7 +50,7 @@ pub struct PostLogin {
pub referral_code: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LoginPostResponse {
pub bearer_token: UserBearerToken,
pub rpc_keys: BTreeMap<u64, rpc_key::Model>,

@ -5,7 +5,7 @@ use crate::globals::global_db_replica_conn;
use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
};
use crate::stats::influxdb_queries::query_user_stats;
use crate::stats::influxdb_queries::query_user_influx_stats;
use crate::stats::StatType;
use axum::{
extract::Query,
@ -16,13 +16,14 @@ use axum::{
use axum_macros::debug_handler;
use entities;
use entities::sea_orm_active_enums::Role;
use entities::{revert_log, rpc_key, secondary_user};
use entities::{revert_log, rpc_accounting_v2, rpc_key, secondary_user};
use hashbrown::HashMap;
use migration::sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
use serde::Serialize;
use serde_json::json;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::info;
/// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs.
#[debug_handler]
@ -123,16 +124,42 @@ pub async fn user_revert_logs_get(
/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested.
#[debug_handler]
pub async fn user_stats_aggregated_get(
pub async fn user_influx_stats_aggregated_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
let response = query_user_stats(&app, bearer, &params, StatType::Aggregated).await?;
let response = query_user_influx_stats(&app, bearer, &params, StatType::Aggregated).await?;
Ok(response)
}
/// `GET /user/stats/accounting` -- Use a bearer token to get the user's revert logs.
#[debug_handler]
pub async fn user_mysql_stats_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = global_db_replica_conn().await?;
// Fetch everything from mysql, joined
let stats = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.find_with_related(rpc_accounting_v2::Entity)
.all(db_replica.as_ref())
.await?;
let stats = stats.into_iter().map(|x| x.1).flatten().collect::<Vec<_>>();
let mut response = HashMap::new();
response.insert("stats", stats);
info!("Response is: {:?}", response);
Ok(Json(response).into_response())
}
/// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested.
///
/// If no bearer is provided, detailed stats for all users will be shown.
@ -143,12 +170,12 @@ pub async fn user_stats_aggregated_get(
///
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_stats_detailed_get(
pub async fn user_influx_stats_detailed_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
let response = query_user_stats(&app, bearer, &params, StatType::Detailed).await?;
let response = query_user_influx_stats(&app, bearer, &params, StatType::Detailed).await?;
Ok(response)
}

@ -23,10 +23,10 @@ use influxdb2::api::query::FluxRecord;
use influxdb2::models::Query;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use serde_json::json;
use tracing::{debug, error, trace, warn};
use tracing::{error, info, trace, warn};
use ulid::Ulid;
pub async fn query_user_stats<'a>(
pub async fn query_user_influx_stats<'a>(
app: &'a Web3ProxyApp,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
@ -250,7 +250,7 @@ pub async fn query_user_stats<'a>(
|> filter(fn: (r) => r._measurement == "{measurement}")
cumsum = base()
|> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis")
|> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_incl_free_credits_used" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis")
|> group(columns: {group_keys})
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|> drop(columns: ["_start", "_stop"])
@ -290,8 +290,7 @@ pub async fn query_user_stats<'a>(
));
}
// TODO: lower log level
debug!("Raw query to db is: {:#}", query);
trace!("Raw query to db is: {:#}", query);
let query = Query::new(query.to_string());
trace!(?query, "influx");
@ -431,6 +430,15 @@ pub async fn query_user_stats<'a>(
error!("sum_credits_used should always be a Double!");
}
}
} else if key == "sum_incl_free_credits_used" {
match value {
influxdb2_structmap::value::Value::Double(inner) => {
out.insert("total_incl_free_credits_used", json!(f64::from(inner)));
}
_ => {
error!("sum_incl_free_credits_used should always be a Double!");
}
}
} else if key == "sum_request_bytes" {
match value {
influxdb2_structmap::value::Value::Long(inner) => {

@ -16,6 +16,9 @@ use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
use entities::{referee, referrer, rpc_accounting_v2};
use ethers::prelude::rand;
use ethers::prelude::rand::distributions::Alphanumeric;
use ethers::prelude::rand::Rng;
use influxdb2::models::DataPoint;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
@ -479,6 +482,7 @@ impl BufferedRpcQueryStats {
measurement: &str,
chain_id: u64,
key: RpcQueryKey,
instance: &String,
) -> anyhow::Result<DataPoint> {
let mut builder = DataPoint::builder(measurement);
@ -488,6 +492,8 @@ impl BufferedRpcQueryStats {
builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string());
}
builder = builder.tag("instance", instance);
builder = builder.tag("method", key.method);
builder = builder
@ -503,11 +509,17 @@ impl BufferedRpcQueryStats {
.field("sum_response_millis", self.sum_response_millis as i64)
.field("sum_response_bytes", self.sum_response_bytes as i64)
.field(
"sum_credits_used",
"sum_incl_free_credits_used",
self.sum_credits_used
.to_f64()
.context("sum_credits_used is really (too) large")?,
)
.field(
"sum_credits_used",
self.paid_credits_used
.to_f64()
.context("sum_credits_used is really (too) large")?,
)
.field(
"balance",
self.approximate_balance_remaining

@ -56,6 +56,7 @@ pub struct StatBuffer {
user_balance_cache: UserBalanceCache,
_flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
instance_hash: String,
}
impl StatBuffer {
@ -72,6 +73,7 @@ impl StatBuffer {
tsdb_save_interval_seconds: u32,
flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
instance_hash: String,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if influxdb_bucket.is_none() {
influxdb_client = None;
@ -95,6 +97,7 @@ impl StatBuffer {
tsdb_save_interval_seconds,
user_balance_cache: user_balance_cache.unwrap(),
_flush_sender: flush_sender,
instance_hash,
};
// any errors inside this task will cause the application to exit
@ -323,7 +326,7 @@ impl StatBuffer {
for (key, stat) in self.global_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat
.build_timeseries_point("global_proxy", self.chain_id, key)
.build_timeseries_point("global_proxy", self.chain_id, key, &self.instance_hash)
.await
{
Ok(point) => {
@ -338,7 +341,7 @@ impl StatBuffer {
for (key, stat) in self.opt_in_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat
.build_timeseries_point("opt_in_proxy", self.chain_id, key)
.build_timeseries_point("opt_in_proxy", self.chain_id, key, &self.instance_hash)
.await
{
Ok(point) => {

@ -74,6 +74,8 @@ impl MigrateStatsToV2SubCommand {
let (flush_sender, flush_receiver) = mpsc::channel(1);
let instance_hash = Ulid::new().to_string();
// Spawn the stat-sender
let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
@ -87,6 +89,7 @@ impl MigrateStatsToV2SubCommand {
1,
flush_sender,
flush_receiver,
instance_hash,
)
.context("Error spawning stat buffer")?
.context("No stat buffer spawned. Maybe missing influx or db credentials?")?;

@ -1,4 +1,5 @@
use super::{anvil::TestAnvil, mysql::TestMysql};
use crate::common::influx::TestInflux;
use ethers::{
prelude::{Http, Provider},
types::Address,
@ -43,7 +44,11 @@ pub struct TestApp {
}
impl TestApp {
pub async fn spawn(anvil: &TestAnvil, db: Option<&TestMysql>) -> Self {
pub async fn spawn(
anvil: &TestAnvil,
db: Option<&TestMysql>,
influx: Option<&TestInflux>,
) -> Self {
let chain_id = anvil.instance.chain_id();
let num_workers = 2;
@ -54,12 +59,26 @@ impl TestApp {
let db_url = db.map(|x| x.url.clone());
let (influx_host, influx_org, influx_token, influx_bucket) = match influx {
None => (None, None, None, None),
Some(x) => (
Some(x.host.clone()),
Some(x.org.clone()),
Some(x.token.clone()),
Some(x.bucket.clone()),
),
};
// make a test TopConfig
// TODO: test influx
// TODO: test redis
let app_config: AppConfig = serde_json::from_value(json!({
"chain_id": chain_id,
"db_url": db_url,
"influxdb_host": influx_host,
"influxdb_org": influx_org,
"influxdb_token": influx_token,
"influxdb_bucket": influx_bucket,
"default_user_max_requests_per_period": Some(6_000_000),
"deposit_factory_contract": Address::from_str(
"4e3BC2054788De923A04936C6ADdB99A05B0Ea36",
@ -72,6 +91,8 @@ impl TestApp {
}))
.unwrap();
info!("App Config is: {:?}", app_config);
let top_config = TopConfig {
app: app_config,
balanced_rpcs: HashMap::from([(
@ -82,6 +103,7 @@ impl TestApp {
..Default::default()
},
)]),
// influxdb_client: influx.map(|x| x.client),
private_rpcs: None,
bundler_4337_rpcs: None,
extra: Default::default(),

@ -0,0 +1,11 @@
use ulid::Ulid;
use url::Url;
use web3_proxy::rpcs::provider::EthersHttpProvider;
#[allow(unused)]
pub async fn create_provider_for_user(url: &Url, user_secret_key: &Ulid) -> EthersHttpProvider {
// Then generate a provider
let proxy_endpoint = format!("{}rpc/{}", url, user_secret_key);
EthersHttpProvider::try_from(proxy_endpoint).unwrap()
}

@ -0,0 +1,184 @@
use ethers::prelude::rand::{self, distributions::Alphanumeric, Rng};
use influxdb2::Client;
use migration::sea_orm::DatabaseConnection;
use std::process::Command as SyncCommand;
use std::time::Duration;
use tokio::{
net::TcpStream,
process::Command as AsyncCommand,
time::{sleep, Instant},
};
use tracing::{info, trace, warn};
use web3_proxy::relational_db::{connect_db, get_migrated_db};
/// on drop, the mysql docker container will be shut down
#[derive(Debug)]
pub struct TestInflux {
pub host: String,
pub org: String,
pub token: String,
pub bucket: String,
pub container_name: String,
pub client: Client,
}
impl TestInflux {
#[allow(unused)]
pub async fn spawn() -> Self {
let random: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
let db_container_name = format!("web3-proxy-test-influx-{}", random);
info!(%db_container_name);
// docker run -d -p 8086:8086 \
// --name influxdb2 \
// -v $PWD/data:/var/lib/influxdb2 \
// -v $PWD/config:/etc/influxdb2 \
// -e DOCKER_INFLUXDB_INIT_MODE=setup \
// -e DOCKER_INFLUXDB_INIT_USERNAME=root \
// -e DOCKER_INFLUXDB_INIT_PASSWORD=secret-password \
// -e DOCKER_INFLUXDB_INIT_ORG=my-init-org \
// -e DOCKER_INFLUXDB_INIT_BUCKET=my-init-bucket \
// -e DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=secret-token \
let username = "dev_web3_proxy";
let password = "dev_web3_proxy";
let org = "dev_org";
let init_bucket = "dev_web3_proxy";
let admin_token = "dev_web3_proxy_auth_token";
let cmd = AsyncCommand::new("docker")
.args([
"run",
"--name",
&db_container_name,
"--rm",
"-d",
"-e",
"DOCKER_INFLUXDB_INIT_MODE=setup",
"-e",
&format!("DOCKER_INFLUXDB_INIT_USERNAME={}", username),
"-e",
&format!("DOCKER_INFLUXDB_INIT_PASSWORD={}", password),
"-e",
&format!("DOCKER_INFLUXDB_INIT_ORG={}", org),
"-e",
&format!("DOCKER_INFLUXDB_INIT_BUCKET={}", init_bucket),
"-e",
&format!("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN={}", admin_token),
"-p",
"0:8086",
"influxdb:2.6.1-alpine",
])
.output()
.await
.expect("failed to start influx");
// original port 18086
info!("Creation command is: {:?}", cmd);
// give the db a second to start
// TODO: wait until docker says it is healthy
sleep(Duration::from_secs(1)).await;
let docker_inspect_output = AsyncCommand::new("docker")
.args(["inspect", &db_container_name])
.output()
.await
.unwrap();
info!(?docker_inspect_output);
let docker_inspect_json = String::from_utf8(docker_inspect_output.stdout).unwrap();
info!(%docker_inspect_json);
let docker_inspect_json: serde_json::Value =
serde_json::from_str(&docker_inspect_json).unwrap();
let influx_ports = docker_inspect_json
.get(0)
.unwrap()
.get("NetworkSettings")
.unwrap()
.get("Ports")
.unwrap()
.get("8086/tcp")
.unwrap()
.get(0)
.unwrap();
trace!(?influx_ports);
let influx_port: u64 = influx_ports
.get("HostPort")
.expect("unable to determine influx port")
.as_str()
.unwrap()
.parse()
.unwrap();
let influx_ip = influx_ports
.get("HostIp")
.and_then(|x| x.as_str())
.expect("unable to determine influx ip");
info!("Influx IP is: {:?}", influx_ip);
// let host = "http://localhost:8086";
let host = format!("http://{}:{}", influx_ip, influx_port);
// Create the client ...
let influxdb_client = influxdb2::Client::new(host.clone(), org, admin_token);
info!("Influx client is: {:?}", influxdb_client);
// create the db_data as soon as the url is known
// when this is dropped, the db will be stopped
let mut test_influx = Self {
host: host.to_string(),
org: org.to_string(),
token: admin_token.to_string(),
bucket: init_bucket.to_string(),
container_name: db_container_name.clone(),
client: influxdb_client,
};
let start = Instant::now();
let max_wait = Duration::from_secs(5);
loop {
if start.elapsed() > max_wait {
panic!("db took too long to start");
}
if TcpStream::connect(format!("{}:{}", influx_ip, influx_port))
.await
.is_ok()
{
break;
};
// not open wait. sleep and then try again
sleep(Duration::from_secs(1)).await;
}
sleep(Duration::from_secs(1)).await;
info!(?test_influx, elapsed=%start.elapsed().as_secs_f32(), "influx post is open. Migrating now...");
test_influx
}
}
impl Drop for TestInflux {
fn drop(&mut self) {
info!(%self.container_name, "killing influx");
let _ = SyncCommand::new("docker")
.args(["kill", "-s", "9", &self.container_name])
.output();
}
}

@ -3,10 +3,13 @@ pub mod admin_increases_balance;
pub mod anvil;
pub mod app;
pub mod create_admin;
pub mod create_provider_with_rpc_key;
pub mod create_user;
pub mod influx;
pub mod mysql;
pub mod referral;
pub mod rpc_key;
pub mod stats_accounting;
pub mod user_balance;
pub use self::app::TestApp;

@ -17,9 +17,8 @@ pub struct TestMysql {
}
impl TestMysql {
#[allow(unused)]
pub async fn spawn() -> Self {
// sqlite doesn't seem to work. our migrations are written for mysql
// so lets use docker to start mysql
let password: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(16)
@ -66,7 +65,6 @@ impl TestMysql {
// TODO: wait until docker says it is healthy
sleep(Duration::from_secs(1)).await;
// TODO: why is this always empty?!
let docker_inspect_output = AsyncCommand::new("docker")
.args(["inspect", &db_container_name])
.output()

@ -0,0 +1,100 @@
use crate::common::TestApp;
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{info, trace};
use web3_proxy::frontend::users::authentication::LoginPostResponse;
/// Get the user stats accounting
/// Helper function to get the user's mysql (rpc_accounting_v2) stats
#[allow(unused)]
pub async fn user_get_mysql_stats(
x: &TestApp,
r: &reqwest::Client,
login_response: &LoginPostResponse,
) -> serde_json::Value {
let mysql_stats = format!("{}user/stats/accounting", x.proxy_provider.url());
let _stats_response = r
.get(mysql_stats)
.bearer_auth(login_response.bearer_token)
.send()
.await
.unwrap();
trace!(
?_stats_response,
"get stats for user #{}",
login_response.user.id
);
assert_eq!(_stats_response.status(), 200);
let stats_response = _stats_response.json().await.unwrap();
info!("stats_response: {:#}", json!(&stats_response));
stats_response
}
/// Helper function to get the user's balance
#[allow(unused)]
pub async fn user_get_influx_stats_detailed(
x: &TestApp,
r: &reqwest::Client,
login_response: &LoginPostResponse,
) -> serde_json::Value {
let stats_detailed = format!("{}user/stats/detailed", x.proxy_provider.url());
let _stats_response = r
.get(stats_detailed)
.bearer_auth(login_response.bearer_token)
.send()
.await
.unwrap();
info!(
?_stats_response,
"get stats for user #{}", login_response.user.id
);
assert_eq!(_stats_response.status(), 200);
let stats_response = _stats_response.json().await.unwrap();
info!("stats_response: {:#}", json!(&stats_response));
stats_response
}
#[allow(unused)]
pub async fn user_get_influx_stats_aggregated(
x: &TestApp,
r: &reqwest::Client,
login_response: &LoginPostResponse,
chain_id: u64,
) -> serde_json::Value {
let query_window_seconds = 300;
let chain_id = chain_id;
let start = SystemTime::now();
let query_start = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
- 1200;
let stats_aggregated = format!(
"{}user/stats/aggregate?query_window_seconds={}&chain_id={}&query_start={}",
x.proxy_provider.url(),
query_window_seconds,
chain_id,
query_start
);
info!("Stats aggregated request is: {:?}", stats_aggregated);
info!("Sending queries to influx");
let _stats_response = r
.get(stats_aggregated)
.bearer_auth(login_response.bearer_token)
.send()
.await
.unwrap();
info!(
?_stats_response,
"get stats for user #{}", login_response.user.id
);
assert_eq!(_stats_response.status(), 200);
let stats_response = _stats_response.json().await.unwrap();
info!("stats_response: {:#}", json!(&stats_response));
stats_response
}

@ -29,7 +29,7 @@ async fn test_admin_grant_credits() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(3))

@ -0,0 +1,298 @@
mod common;
use crate::common::create_provider_with_rpc_key::create_provider_for_user;
use crate::common::influx::TestInflux;
use crate::common::rpc_key::user_get_first_rpc_key;
use crate::common::stats_accounting::{
user_get_influx_stats_aggregated, user_get_influx_stats_detailed, user_get_mysql_stats,
};
use crate::common::user_balance::user_get_balance;
use crate::common::{
admin_increases_balance::admin_increase_balance, anvil::TestAnvil,
create_admin::create_user_as_admin, create_user::create_user, mysql::TestMysql, TestApp,
};
use futures::future::{join_all, try_join_all};
use rust_decimal::Decimal;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
use web3_proxy::rpcs::blockchain::ArcBlock;
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn test_multiple_proxies_stats_add_up() {
let chain_id = 999_001_999;
let a = TestAnvil::spawn(chain_id).await;
let db = TestMysql::spawn().await;
let influx = TestInflux::spawn().await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();
// Since when do indices start with 1
let x_0 = TestApp::spawn(&a, Some(&db), Some(&influx)).await;
let x_1 = TestApp::spawn(&a, Some(&db), Some(&influx)).await;
// make a user and give them credits
let user_0_wallet = a.wallet(0);
let user_1_wallet = a.wallet(1);
let admin_wallet = a.wallet(2);
info!(?admin_wallet);
// Login both users
let user_0_login = create_user(&x_0, &r, &user_0_wallet, None).await;
let user_1_login = create_user(&x_1, &r, &user_1_wallet, None).await;
let admin_login = create_user_as_admin(&x_0, &db, &r, &admin_wallet).await;
// Load up balances
admin_increase_balance(&x_0, &r, &admin_login, &user_0_wallet, Decimal::from(1000)).await;
admin_increase_balance(&x_1, &r, &admin_login, &user_1_wallet, Decimal::from(2000)).await;
let user_0_balance = user_get_balance(&x_0, &r, &user_0_login).await;
let user_1_balance = user_get_balance(&x_1, &r, &user_1_login).await;
let user_0_balance_pre = user_0_balance.remaining();
let user_1_balance_pre = user_1_balance.remaining();
assert_eq!(user_0_balance_pre, Decimal::from(1000));
assert_eq!(user_1_balance_pre, Decimal::from(2000));
// Generate the proxies
let number_requests = 50;
let mut handles = Vec::new();
// Get the RPC key from the user
let user_0_secret_key = user_get_first_rpc_key(&x_0, &r, &user_0_login)
.await
.secret_key;
let proxy_0_user_0_provider =
create_provider_for_user(x_0.proxy_provider.url(), &user_0_secret_key).await;
let proxy_1_user_0_provider =
create_provider_for_user(x_1.proxy_provider.url(), &user_0_secret_key).await;
let proxy_0_user_0_provider = Arc::new(proxy_0_user_0_provider);
let proxy_1_user_0_provider = Arc::new(proxy_1_user_0_provider);
warn!("Created users, generated providers");
info!("Proxy 1: {:?}", proxy_0_user_0_provider);
info!("Proxy 2: {:?}", proxy_1_user_0_provider);
for _ in 0..number_requests {
// send 2 to proxy 0 user 0
let proxy_0_user_0_provider_clone = proxy_0_user_0_provider.clone();
handles.push(tokio::spawn(async move {
proxy_0_user_0_provider_clone
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap()
}));
let proxy_0_user_0_provider_clone = proxy_0_user_0_provider.clone();
handles.push(tokio::spawn(async move {
proxy_0_user_0_provider_clone
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap()
}));
// send 1 to proxy 1 user 0
let proxy_1_user_0_provider_clone = proxy_1_user_0_provider.clone();
handles.push(tokio::spawn(async move {
proxy_1_user_0_provider_clone
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap()
}));
}
try_join_all(handles).await.unwrap();
// Flush all stats here
// TODO: the test should maybe pause time so that stats definitely flush from our queries.
let flush_0_count = x_0.flush_stats().await.unwrap();
let flush_1_count = x_1.flush_stats().await.unwrap();
// Wait a bit
sleep(Duration::from_secs(5)).await;
info!("Counts 0 are: {:?}", flush_0_count);
assert_eq!(flush_0_count.relational, 1);
assert_eq!(flush_0_count.timeseries, 2);
info!("Counts 1 are: {:?}", flush_1_count);
assert_eq!(flush_1_count.relational, 1);
assert_eq!(flush_1_count.timeseries, 2);
// get stats now
// todo!("Need to validate all the stat accounting now");
// Get the stats from here
let mysql_stats = user_get_mysql_stats(&x_0, &r, &user_0_login).await;
info!("mysql stats are: {:?}", mysql_stats);
let influx_aggregate_stats =
user_get_influx_stats_aggregated(&x_0, &r, &user_0_login, chain_id).await;
info!(
"influx_aggregate_stats stats are: {:?}",
influx_aggregate_stats
);
// Get the balance
let user_0_balance_post = user_get_balance(&x_0, &r, &user_0_login).await;
let influx_stats = influx_aggregate_stats["result"].get(0).unwrap();
let mysql_stats = mysql_stats["stats"].get(0).unwrap();
info!("Influx and mysql stats are");
info!(?influx_stats);
info!(?mysql_stats);
assert_eq!(
mysql_stats["error_response"],
influx_stats["error_response"]
);
assert_eq!(
mysql_stats["archive_needed"],
influx_stats["archive_needed"]
);
assert_eq!(
Decimal::from_str(&mysql_stats["chain_id"].to_string().replace('"', "")).unwrap(),
Decimal::from_str(&influx_stats["chain_id"].to_string().replace('"', "")).unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["no_servers"].to_string()).unwrap(),
Decimal::from_str(&influx_stats["no_servers"].to_string()).unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["cache_hits"].to_string()).unwrap(),
Decimal::from_str(&influx_stats["total_cache_hits"].to_string()).unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["cache_misses"].to_string()).unwrap(),
Decimal::from_str(&influx_stats["total_cache_misses"].to_string()).unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["frontend_requests"].to_string()).unwrap(),
Decimal::from_str(&influx_stats["total_frontend_requests"].to_string()).unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["sum_credits_used"].to_string().replace('"', "")).unwrap(),
Decimal::from_str(
&influx_stats["total_credits_used"]
.to_string()
.replace('"', "")
)
.unwrap()
);
assert_eq!(
Decimal::from_str(
&mysql_stats["sum_incl_free_credits_used"]
.to_string()
.replace('"', "")
)
.unwrap(),
Decimal::from_str(
&influx_stats["total_incl_free_credits_used"]
.to_string()
.replace('"', "")
)
.unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["sum_request_bytes"].to_string()).unwrap(),
Decimal::from_str(&influx_stats["total_request_bytes"].to_string()).unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["sum_response_bytes"].to_string()).unwrap(),
Decimal::from_str(
&influx_stats["total_response_bytes"]
.to_string()
.replace('"', "")
)
.unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["sum_response_millis"].to_string()).unwrap(),
Decimal::from_str(
&influx_stats["total_response_millis"]
.to_string()
.replace('"', "")
)
.unwrap()
);
assert_eq!(
Decimal::from_str(&mysql_stats["backend_requests"].to_string()).unwrap(),
Decimal::from_str(&influx_stats["total_backend_requests"].to_string()).unwrap()
);
// We don't have gauges so we cant really fix this in influx. will get back to this later
// assert_eq!(
// Decimal::from(user_0_balance_post.remaining()),
// Decimal::from_str(&influx_stats["balance"].to_string()).unwrap()
// );
// The fields we skip for mysql
// backend_retries, id, no_servers, period_datetime, rpc_key_id
// The fields we skip for influx
// collection, rpc_key, time,
// let user_get_influx_stats_detailed =
// user_get_influx_stats_detailed(&x_0, &r, &user_0_login).await;
// info!(
// "user_get_influx_stats_detailed stats are: {:?}",
// user_get_influx_stats_detailed
// );
}
// Gotta compare stats with influx:
// "stats": [
// {
// "archive_needed": false,
// "backend_requests": 2,
// "backend_retries": 0,
// "cache_hits": 148,
// "cache_misses": 2,
// "chain_id": 999001999,
// "error_response": false,
// "frontend_requests": 150,
// "id": 1,
// "no_servers": 0,
// "period_datetime": "2023-07-13T00:00:00Z",
// "rpc_key_id": 1,
// "sum_credits_used": "180.8000000000",
// "sum_incl_free_credits_used": "180.8000000000",
// "sum_request_bytes": 12433,
// "sum_response_bytes": 230533,
// "sum_response_millis": 194
// }
// ]
// influx
// "result": [
// {
// "archive_needed": false,
// "balance": 939.6,
// "chain_id": "999001999",
// "collection": "opt-in",
// "error_response": false,
// "no_servers": 0,
// "rpc_key": "01H5E9HRZW2S73F1996KPKMYCE",
// "time": "2023-07-16 02:47:56 +00:00",
// "total_backend_requests": 1,
// "total_cache_hits": 49,
// "total_cache_misses": 1,
// "total_credits_used": 60.4,
// "total_frontend_requests": 50,
// "total_incl_free_credits_used": 60.4,
// "total_request_bytes": 4141,
// "total_response_bytes": 76841,
// "total_response_millis": 72
// }

@ -16,7 +16,7 @@ async fn it_migrates_the_db() {
let a = TestAnvil::spawn(31337).await;
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
// we call flush stats more to be sure it works than because we expect it to save any stats
x.flush_stats().await.unwrap();
@ -26,7 +26,7 @@ async fn it_migrates_the_db() {
async fn it_starts_and_stops() {
let a = TestAnvil::spawn(31337).await;
let x = TestApp::spawn(&a, None).await;
let x = TestApp::spawn(&a, None, None).await;
let anvil_provider = &a.provider;
let proxy_provider = &x.proxy_provider;

@ -19,7 +19,7 @@ async fn test_sum_credits_used() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(3))
@ -78,7 +78,8 @@ async fn test_sum_credits_used() {
// flush stats
let flushed = x.flush_stats().await.unwrap();
assert_eq!(flushed.relational, 2, "relational");
// TODO: this was 2 when we flushed stats for anon users. that was temporarily disabled. it should be turned back on once indexes are fixed
assert_eq!(flushed.relational, 1, "relational");
assert_eq!(flushed.timeseries, 0, "timeseries");
// Give user wallet $1000

@ -42,7 +42,7 @@ async fn test_log_in_and_out() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::new();
@ -103,7 +103,7 @@ async fn test_admin_balance_increase() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
@ -156,7 +156,7 @@ async fn test_user_balance_decreases() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
@ -264,7 +264,7 @@ async fn test_referral_bonus_non_concurrent() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
@ -413,7 +413,7 @@ async fn test_referral_bonus_concurrent_referrer_only() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
@ -573,7 +573,7 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
let x = TestApp::spawn(&a, Some(&db), None).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))