diff --git a/config/example.toml b/config/example.toml index 7c61b8f5..9058b8dd 100644 --- a/config/example.toml +++ b/config/example.toml @@ -17,6 +17,7 @@ kafka_urls = "127.0.0.1:19092" influxdb_host = "http://127.0.0.1:18086" influxdb_org = "dev_org" influxdb_token = "dev_web3_proxy_auth_token" +influxdb_bucketname = "web3_proxy" # thundering herd protection # only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit diff --git a/docker-compose.yml b/docker-compose.yml index 1f757dc9..beda587d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ services: DOCKER_INFLUXDB_INIT_USERNAME: dev_web3_proxy DOCKER_INFLUXDB_INIT_PASSWORD: dev_web3_proxy DOCKER_INFLUXDB_INIT_ORG: dev_org - DOCKER_INFLUXDB_INIT_BUCKET: web3_proxy + DOCKER_INFLUXDB_INIT_BUCKET: dev_web3_proxy DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev_web3_proxy_auth_token ports: - 127.0.0.1:18086:8086 diff --git a/scripts/ethspam b/scripts/ethspam new file mode 100755 index 00000000..3915594f Binary files /dev/null and b/scripts/ethspam differ diff --git a/scripts/generate-requests-and-stats.sh b/scripts/generate-requests-and-stats.sh new file mode 100644 index 00000000..62d45953 --- /dev/null +++ b/scripts/generate-requests-and-stats.sh @@ -0,0 +1,7 @@ +# Got eth spam from here + +# Got versus from here +# https://github.com/INFURA/versus +# ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that + +./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100000000 http://localhost:8544 diff --git a/scripts/get-stats-aggregated.sh b/scripts/get-stats-aggregated.sh new file mode 100644 index 00000000..d229bec4 --- /dev/null +++ b/scripts/get-stats-aggregated.sh @@ -0,0 +1,7 @@ +# Make a get request to get the stats in an aggregated fashion + +# I dont think we need a user id ... + + +curl -X GET \ +"http://localhost:8544/user/stats/aggregate?query_start=1678463411&query_window_seconds=1000000" diff --git a/scripts/versus b/scripts/versus new file mode 100755 index 00000000..6aff1a70 Binary files /dev/null and b/scripts/versus differ diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ae8445b5..3834eb2a 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -575,6 +575,7 @@ impl Web3ProxyApp { // stats can be saved in mysql, influxdb, both, or none let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( top_config.app.chain_id, + top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(), db_conn.clone(), influxdb_client.clone(), 60, diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 5364d5e6..88b64a0e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -9,6 +9,7 @@ mod create_key; mod create_user; mod drop_migration_lock; mod list_user_tier; +mod migrate_stats_to_v2; mod pagerduty; mod popularity_contest; mod proxyd; @@ -372,6 +373,14 @@ fn main() -> anyhow::Result<()> { x.main(&db_conn).await } + SubCommand::MigrateStatsToV2(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx"); + + let db_conn = get_db(db_url, 1, 1).await?; + x.main(&db_conn).await + } SubCommand::Pagerduty(x) => { if cli_config.sentry_url.is_none() { warn!("sentry_url is not set! Logs will only show in this console"); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index f9010902..1ba1b298 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -178,6 +178,9 @@ pub struct AppConfig { /// influxdb token for stats pub influxdb_token: Option, + /// influxdb bucket to use for stats + pub influxdb_bucket: Option, + /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 8b30efa0..9f871e3c 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -219,7 +219,7 @@ pub fn get_query_window_seconds_from_params( params.get("query_window_seconds").map_or_else( || { // no page in params. set default - Ok(0) + Ok(1) }, |query_window_seconds: &String| { // parse the given timestamp diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index d38f5865..1aafbaf9 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -18,8 +18,10 @@ use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::models::Query; use influxdb2::FromDataPoint; +use itertools::Itertools; +use log::info; use serde::Serialize; -use serde_json::json; +use serde_json::{json}; // TODO: include chain_id, method, and some other things in this struct #[derive(Debug, Default, FromDataPoint, Serialize)] @@ -35,10 +37,12 @@ pub async fn query_user_stats<'a>( params: &'a HashMap, stat_response_type: StatType, ) -> Result { + info!("Got this far 1"); let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica() .context("query_user_stats needs a db replica")?; + info!("Got this far 2"); let mut redis_conn = app .redis_conn() .await @@ -46,28 +50,52 @@ pub async fn query_user_stats<'a>( .context("query_user_stats needs a redis")?; // TODO: have a getter for this. do we need a connection pool on it? + info!("Got this far 3"); let influxdb_client = app .influxdb_client .as_ref() .context("query_user_stats needs an influxdb client")?; + info!("Got this far 4"); // get the user id first. if it is 0, we should use a cache on the app let user_id = get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; + info!("Got this far 5"); let query_window_seconds = get_query_window_seconds_from_params(params)?; let query_start = get_query_start_from_params(params)?.timestamp(); let query_stop = get_query_stop_from_params(params)?.timestamp(); let chain_id = get_chain_id_from_params(app, params)?; + // query_window_seconds must be provided, and should be not 1s (?) by default .. + + // Return a bad request if query_start == query_stop, because then the query is empty basically + if query_start == query_stop { + return Err(FrontendErrorResponse::BadRequest("Start and Stop date cannot be equal. Please specify a (different) start date.".to_owned())); + } + + info!("Got this far 6"); let measurement = if user_id == 0 { "global_proxy" } else { "opt_in_proxy" }; - let bucket = "web3_proxy"; + // from(bucket: "dev_web3_proxy") + // |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + // |> filter(fn: (r) => r["_measurement"] == "opt_in_proxy" or r["_measurement"] == "global_proxy") + // |> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "sum_request_bytes") + // |> group(columns: ["_field", "_measurement"]) + // |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) + // |> yield(name: "mean") + // TODO: Should be taken from the config, not hardcoded ... + // TODO: Turn into a 500 error if bucket is not found .. + // Or just unwrap or so + let bucket = &app.config.influxdb_bucket.clone().context("No influxdb bucket was provided")?; // "web3_proxy"; + info!("Bucket is {:?}", bucket); + + info!("Got this far 7"); let mut group_columns = vec!["_measurement", "_field"]; let mut filter_chain_id = "".to_string(); @@ -77,18 +105,48 @@ pub async fn query_user_stats<'a>( filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#); } + info!("Got this far 8"); let group_columns = serde_json::to_string(&json!(group_columns)).unwrap(); + info!("Got this far 9"); let group = match stat_response_type { StatType::Aggregated => f!(r#"|> group(columns: {group_columns})"#), StatType::Detailed => "".to_string(), }; + info!("Got this far 10"); let filter_field = match stat_response_type { - StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), + // StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), + // Let's show all endpoints in a detailed stats + // StatType::Aggregated => "".to_string(), // f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), + StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "cache_hits" or r["_field"] == "cache_misses" or r["_field"] == "no_servers" or r["_field"] == "sum_request_bytes" or r["_field"] == "sum_response_bytes" or r["_field"] == "sum_response_millis")"#), StatType::Detailed => "".to_string(), }; + info!("Query start and stop are: {:?} {:?}", query_start, query_stop); + info!("Query measurement is: {:?}", measurement); + info!("Filters are: {:?} {:?}", filter_field, filter_chain_id); + info!("Group is: {:?}", group); + info!("window seconds are: {:?}", query_window_seconds); + + // These are taken care of probably ... + // reg. fields, collect: backend_requests, frontend_requests, cache_hits, cache_misses, total_request_bytes, total_response_bytes, total_response_millis + // "total_frontend_requests": "6", + // "total_response_bytes": "235", + // "total_response_millis": "0" + // "total_cache_hits": "6", + // "total_cache_misses": "0", + + // Perhaps gotta run a second query to get all error responses + // "total_error_responses": "0", + // Same with archive requests + // "archive_request": 0, + + // Group by method if detailed, else just keep all methods as "null". i think influxdb takes care of that + // "method": null, + // "total_backend_retries": "0", + + info!("Got this far 11"); let query = f!(r#" from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) @@ -96,15 +154,76 @@ pub async fn query_user_stats<'a>( {filter_field} {filter_chain_id} {group} - |> aggregateWindow(every: {query_window_seconds}, fn: mean, createEmpty: false) - |> yield(name: "mean") + |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) + |> yield(name: "sum") "#); + info!("Raw query to db is: {:?}", query); let query = Query::new(query.to_string()); + info!("Query to db is: {:?}", query); // TODO: do not unwrap. add this error to FrontErrorResponse // TODO: StatType::Aggregated and StatType::Detailed might need different types - let res: Vec = influxdb_client.query(Some(query)).await?; + // let unparsed: serde_json::Value = serde_json::Value::Array(influxdb_client.query(Some(query.clone())).await?); + // info!("Direct response is: {:?}", unparsed); + info!("Got this far 12"); - Ok(Json(json!(res)).into_response()) + let influx_responses: Vec = influxdb_client.query(Some(query)).await?; + info!("Influx responses are {:?}", &influx_responses); + for res in &influx_responses { + info!("Resp is: {:?}", res); + } + + // Group by all fields together .. + let datapoints = influx_responses + .into_iter() + .group_by(|x| { + // This looks ugly, revisit later + // x.field.clone() + (x.clone().time) + }) + .into_iter() + .map(|(group, grouped_items)| { + // Now put all the fields next to each other + // (there will be exactly one field per timestamp, but we want to arrive at a new object) + let mut out = HashMap::new(); + // Could also add a timestamp + + for x in grouped_items { + out.insert( + f!(r#"total_{x.field}"#), + // serde_json::Value::Number(serde_json::Number::from(x.value)) + json!(x.value) + ); + + if !out.contains_key("query_window_timestamp") { + out.insert( + "query_window_timestamp".to_owned(), + // serde_json::Value::Number(x.time.timestamp().into()) + json!(x.time.timestamp()) + ); + } + } + json!(out) + }).collect::>(); + + // I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go. + // Same with error responses .. + let mut out = HashMap::new(); + out.insert("num_items", serde_json::Value::Number(datapoints.len().into())); + out.insert("result", serde_json::Value::Array(datapoints)); + out.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into())); + out.insert("query_start", serde_json::Value::Number(query_start.into())); + out.insert("chain_id", serde_json::Value::Number(chain_id.into())); + + info!("Got this far 13 {:?}", out); + let out = Json(json!(out)).into_response(); + // Add the requests back into out + + info!("Got this far 14 {:?}", out); + + + // TODO: Now impplement the proper response type + + Ok(out) } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index e0ad8f39..1060e641 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -394,6 +394,7 @@ impl RpcQueryStats { impl StatBuffer { pub fn try_spawn( chain_id: u64, + bucket: String, db_conn: Option, influxdb_client: Option, db_save_interval_seconds: u32, @@ -418,7 +419,7 @@ impl StatBuffer { // any errors inside this task will cause the application to exit let handle = tokio::spawn(async move { - new.aggregate_and_save_loop(stat_receiver, shutdown_receiver) + new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver) .await }); @@ -427,6 +428,7 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, + bucket: String, stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { @@ -486,14 +488,14 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_timeseries("dev_web3_proxy", "global_proxy", self.chain_id, influxdb_client, key).await { + if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "global_proxy", self.chain_id, influxdb_client, key).await { error!("unable to save global stat! err={:?}", err); }; } for (key, stat) in opt_in_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_timeseries("dev_web3_proxy", "opt_in_proxy", self.chain_id, influxdb_client, key).await { + if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "opt_in_proxy", self.chain_id, influxdb_client, key).await { error!("unable to save opt-in stat! err={:?}", err); }; } @@ -538,7 +540,7 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { if let Err(err) = stat .save_timeseries( - "dev_web3_proxy", + &bucket, "global_proxy", self.chain_id, influxdb_client, @@ -561,7 +563,7 @@ impl StatBuffer { for (key, stat) in opt_in_timeseries_buffer.drain() { if let Err(err) = stat .save_timeseries( - "dev_web3_proxy", + &bucket, "opt_in_proxy", self.chain_id, influxdb_client,