diff --git a/Cargo.lock b/Cargo.lock index 8e0b16ff..b26c57a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -992,6 +992,16 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -1937,6 +1947,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.1.0" @@ -1966,6 +1991,37 @@ dependencies = [ "winapi", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + +[[package]] +name = "fstrings" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7845a0f15da505ac36baad0486612dab57f8b8d34e19c5470a265bbcdd572ae6" +dependencies = [ + "fstrings-proc-macro", + "proc-macro-hack", +] + +[[package]] +name = "fstrings-proc-macro" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b58c0e7581dc33478a32299182cbe5ae3b8c028be26728a47fb0a113c92d9d" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "funty" version = "1.1.0" @@ -2481,6 +2537,19 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -3002,6 +3071,24 @@ dependencies = [ "getrandom", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -3229,6 +3316,51 @@ dependencies = [ "syn", ] +[[package]] +name = "openssl" +version = "0.10.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.13.0" @@ -4016,10 +4148,12 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -4029,6 +4163,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "tower-service", @@ -4304,6 +4439,15 @@ dependencies = [ "syn", ] +[[package]] +name = "schannel" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3" +dependencies = [ + "windows-sys 0.42.0", +] + [[package]] name = "scheduled-thread-pool" version = "0.2.6" @@ -4536,6 +4680,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.16" @@ -5417,6 +5584,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -6101,6 +6278,7 @@ dependencies = [ "ewma", "fdlimit", "flume", + "fstrings", "futures", "gethostname", "glob", diff --git a/docker-compose.yml b/docker-compose.yml index beda587d..1f757dc9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ services: DOCKER_INFLUXDB_INIT_USERNAME: dev_web3_proxy DOCKER_INFLUXDB_INIT_PASSWORD: dev_web3_proxy DOCKER_INFLUXDB_INIT_ORG: dev_org - DOCKER_INFLUXDB_INIT_BUCKET: dev_web3_proxy + DOCKER_INFLUXDB_INIT_BUCKET: web3_proxy DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev_web3_proxy_auth_token ports: - 127.0.0.1:18086:8086 diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 41184e26..e1aaef47 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -43,6 +43,7 @@ ethers = { version = "1.0.2", default-features = false, features = ["rustls", "w ewma = "0.1.1" fdlimit = "0.2.1" flume = "0.10.14" +fstrings = "0.2" futures = { version = "0.3.26", features = ["thread-pool"] } gethostname = "0.4.1" glob = "0.3.1" @@ -50,7 +51,7 @@ handlebars = "4.3.6" hashbrown = { version = "0.13.2", features = ["serde"] } hdrhistogram = "7.5.2" http = "0.2.9" -influxdb2 = { version = "0.3", features = ["rustls"], default-features = false } +influxdb2 = { version = "0.3", features = ["rustls"] } ipnet = "2.7.1" itertools = "0.10.5" log = "0.4.17" diff --git a/web3_proxy/examples/influxdb2.rs b/web3_proxy/examples/influxdb2.rs new file mode 100644 index 00000000..b16793de --- /dev/null +++ b/web3_proxy/examples/influxdb2.rs @@ -0,0 +1,42 @@ +use chrono::{DateTime, FixedOffset}; +use influxdb2::models::Query; +use influxdb2::{Client, FromDataPoint, FromMap}; + +#[derive(Debug, FromDataPoint)] +pub struct StockPrice { + ticker: String, + value: f64, + time: DateTime, +} + +impl Default for StockPrice { + fn default() -> Self { + Self { + ticker: "".to_string(), + value: 0_f64, + time: chrono::MIN_DATETIME.with_timezone(&chrono::FixedOffset::east(7 * 3600)), + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let host = std::env::var("INFLUXDB_HOST").unwrap(); + let org = std::env::var("INFLUXDB_ORG").unwrap(); + let token = std::env::var("INFLUXDB_TOKEN").unwrap(); + let client = Client::new(host, org, token); + + let qs = format!( + "from(bucket: \"stock-prices\") + |> range(start: -1w) + |> filter(fn: (r) => r.ticker == \"{}\") + |> last() + ", + "AAPL" + ); + let query = Query::new(qs.to_string()); + let res: Vec = client.query::(Some(query)).await?; + println!("{:?}", res); + + Ok(()) +} diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index b462274a..8b30efa0 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -176,11 +176,37 @@ pub fn get_query_start_from_params( }, |x: &String| { // parse the given timestamp - let x = x.parse::().context("parsing timestamp query param")?; + let x = x + .parse::() + .context("parsing start timestamp query param")?; - // TODO: error code 401 - let x = - NaiveDateTime::from_timestamp_opt(x, 0).context("parsing timestamp query param")?; + let x = NaiveDateTime::from_timestamp_opt(x, 0) + .context("parsing start timestamp query param")?; + + Ok(x) + }, + ) +} + +// TODO: return chrono::Utc instead? +pub fn get_query_stop_from_params( + params: &HashMap, +) -> anyhow::Result { + params.get("query_stop").map_or_else( + || { + // no timestamp in params. set default + let x = chrono::Utc::now(); + + Ok(x.naive_utc()) + }, + |x: &String| { + // parse the given timestamp + let x = x + .parse::() + .context("parsing stop timestamp query param")?; + + let x = NaiveDateTime::from_timestamp_opt(x, 0) + .context("parsing stop timestamp query param")?; Ok(x) }, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 04d0d401..bda39389 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -334,7 +334,7 @@ impl Web3Rpcs { // TODO: request_metadata or authorization? // we don't actually set min_block_needed here because all nodes have all blocks let response = self - .try_send_best_consensus_head_connection(authorization, request, None, None, None) + .try_send_best_consensus_head_connection(authorization, request, None, Some(num), None) .await?; if let Some(err) = response.error { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 32d02e16..b908ecf1 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -773,13 +773,15 @@ impl Web3Rpc { new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); + // TODO: how many requests should we require in order to skip a health check? if new_total_requests - old_total_requests < 10 { // TODO: if this fails too many times, reset the connection // TODO: move this into a function and the chaining should be easier let head_block = rpc.head_block.read().clone(); if let Some((block_number, txid)) = head_block.and_then(|x| { - let block = x.block; + // let block = x.block; + let block = x.block.clone(); let block_number = block.number?; let txid = block.transactions.last().cloned()?; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index a5833d44..32ac6e61 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,7 +1,11 @@ use super::StatType; use crate::{ - app::Web3ProxyApp, frontend::errors::FrontendErrorResponse, - http_params::get_user_id_from_params, + app::Web3ProxyApp, + frontend::errors::FrontendErrorResponse, + http_params::{ + get_chain_id_from_params, get_page_from_params, get_query_start_from_params, + get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params, + }, }; use anyhow::Context; use axum::{ @@ -9,7 +13,12 @@ use axum::{ response::Response, TypedHeader, }; +use chrono::{DateTime, FixedOffset}; +use fstrings::{f, format_args_f}; use hashbrown::HashMap; +use influxdb2::models::Query; +use influxdb2::{Client, FromDataPoint}; +use serde_json::json; pub async fn query_user_stats<'a>( app: &'a Web3ProxyApp, @@ -37,5 +46,55 @@ pub async fn query_user_stats<'a>( let user_id = get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; + let query_window_seconds = get_query_window_seconds_from_params(params)?; + let query_start = get_query_start_from_params(params)?.timestamp(); + let query_stop = get_query_stop_from_params(params)?.timestamp(); + let chain_id = get_chain_id_from_params(app, params)?; + let page = get_page_from_params(params)?; + + let measurement = if user_id == 0 { + "global_proxy" + } else { + "opt_in_proxy" + }; + + let bucket = "web3_proxy"; + + let mut group_columns = vec!["_measurement", "_field"]; + let mut filter_chain_id = "".to_string(); + + if chain_id == 0 { + group_columns.push("chain_id"); + } else { + filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#); + } + + let group_columns = serde_json::to_string(&json!(group_columns)).unwrap(); + + let group = match stat_response_type { + StatType::Aggregated => f!(r#"|> group(columns: {group_columns})"#), + StatType::Detailed => "".to_string(), + }; + + let filter_field = match stat_response_type { + StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), + StatType::Detailed => "".to_string(), + }; + + let query = f!(r#" + from(bucket: "{bucket}") + |> range(start: {query_start}, stop: {query_stop}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + {filter_field} + {filter_chain_id} + {group} + |> aggregateWindow(every: {query_window_seconds}, fn: mean, createEmpty: false) + |> yield(name: "mean") + "#); + + let query = Query::new(query); + + // let res: Vec<_> = influxdb_client.query(Some(query)).await?; + todo!(); } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index a34959ca..e0ad8f39 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -1,5 +1,5 @@ //! Store "stats" in a database for billing and a different database for graphing -//! +//! //! TODO: move some of these structs/functions into their own file? pub mod db_queries; pub mod influxdb_queries;