wip influxdb2

This commit is contained in:
Bryan Stitt 2023-02-21 20:25:02 -08:00 committed by yenicelik
parent 3f217b930a
commit dbd7860416
9 changed files with 319 additions and 11 deletions

178
Cargo.lock generated

@ -992,6 +992,16 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
@ -1937,6 +1947,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@ -1966,6 +1991,37 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "fstrings"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7845a0f15da505ac36baad0486612dab57f8b8d34e19c5470a265bbcdd572ae6"
dependencies = [
"fstrings-proc-macro",
"proc-macro-hack",
]
[[package]]
name = "fstrings-proc-macro"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b58c0e7581dc33478a32299182cbe5ae3b8c028be26728a47fb0a113c92d9d"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "funty"
version = "1.1.0"
@ -2481,6 +2537,19 @@ dependencies = [
"tokio-rustls",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.53"
@ -3002,6 +3071,24 @@ dependencies = [
"getrandom",
]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "new_debug_unreachable"
version = "1.0.4"
@ -3229,6 +3316,51 @@ dependencies = [
"syn",
]
[[package]]
name = "openssl"
version = "0.10.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7"
dependencies = [
"autocfg",
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.13.0"
@ -4016,10 +4148,12 @@ dependencies = [
"http-body",
"hyper",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
@ -4029,6 +4163,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower-service",
@ -4304,6 +4439,15 @@ dependencies = [
"syn",
]
[[package]]
name = "schannel"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3"
dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.6"
@ -4536,6 +4680,29 @@ dependencies = [
"zeroize",
]
[[package]]
name = "security-framework"
version = "2.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.16"
@ -5417,6 +5584,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
@ -6101,6 +6278,7 @@ dependencies = [
"ewma",
"fdlimit",
"flume",
"fstrings",
"futures",
"gethostname",
"glob",

@ -31,7 +31,7 @@ services:
DOCKER_INFLUXDB_INIT_USERNAME: dev_web3_proxy
DOCKER_INFLUXDB_INIT_PASSWORD: dev_web3_proxy
DOCKER_INFLUXDB_INIT_ORG: dev_org
DOCKER_INFLUXDB_INIT_BUCKET: dev_web3_proxy
DOCKER_INFLUXDB_INIT_BUCKET: web3_proxy
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev_web3_proxy_auth_token
ports:
- 127.0.0.1:18086:8086

@ -43,6 +43,7 @@ ethers = { version = "1.0.2", default-features = false, features = ["rustls", "w
ewma = "0.1.1"
fdlimit = "0.2.1"
flume = "0.10.14"
fstrings = "0.2"
futures = { version = "0.3.26", features = ["thread-pool"] }
gethostname = "0.4.1"
glob = "0.3.1"
@ -50,7 +51,7 @@ handlebars = "4.3.6"
hashbrown = { version = "0.13.2", features = ["serde"] }
hdrhistogram = "7.5.2"
http = "0.2.9"
influxdb2 = { version = "0.3", features = ["rustls"], default-features = false }
influxdb2 = { version = "0.3", features = ["rustls"] }
ipnet = "2.7.1"
itertools = "0.10.5"
log = "0.4.17"

@ -0,0 +1,42 @@
use chrono::{DateTime, FixedOffset};
use influxdb2::models::Query;
use influxdb2::{Client, FromDataPoint, FromMap};
#[derive(Debug, FromDataPoint)]
pub struct StockPrice {
ticker: String,
value: f64,
time: DateTime<FixedOffset>,
}
impl Default for StockPrice {
fn default() -> Self {
Self {
ticker: "".to_string(),
value: 0_f64,
time: chrono::MIN_DATETIME.with_timezone(&chrono::FixedOffset::east(7 * 3600)),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let host = std::env::var("INFLUXDB_HOST").unwrap();
let org = std::env::var("INFLUXDB_ORG").unwrap();
let token = std::env::var("INFLUXDB_TOKEN").unwrap();
let client = Client::new(host, org, token);
let qs = format!(
"from(bucket: \"stock-prices\")
|> range(start: -1w)
|> filter(fn: (r) => r.ticker == \"{}\")
|> last()
",
"AAPL"
);
let query = Query::new(qs.to_string());
let res: Vec<StockPrice> = client.query::<StockPrice>(Some(query)).await?;
println!("{:?}", res);
Ok(())
}

@ -176,11 +176,37 @@ pub fn get_query_start_from_params(
},
|x: &String| {
// parse the given timestamp
let x = x.parse::<i64>().context("parsing timestamp query param")?;
let x = x
.parse::<i64>()
.context("parsing start timestamp query param")?;
// TODO: error code 401
let x =
NaiveDateTime::from_timestamp_opt(x, 0).context("parsing timestamp query param")?;
let x = NaiveDateTime::from_timestamp_opt(x, 0)
.context("parsing start timestamp query param")?;
Ok(x)
},
)
}
// TODO: return chrono::Utc instead?
pub fn get_query_stop_from_params(
params: &HashMap<String, String>,
) -> anyhow::Result<chrono::NaiveDateTime> {
params.get("query_stop").map_or_else(
|| {
// no timestamp in params. set default
let x = chrono::Utc::now();
Ok(x.naive_utc())
},
|x: &String| {
// parse the given timestamp
let x = x
.parse::<i64>()
.context("parsing stop timestamp query param")?;
let x = NaiveDateTime::from_timestamp_opt(x, 0)
.context("parsing stop timestamp query param")?;
Ok(x)
},

@ -334,7 +334,7 @@ impl Web3Rpcs {
// TODO: request_metadata or authorization?
// we don't actually set min_block_needed here because all nodes have all blocks
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, None, None)
.try_send_best_consensus_head_connection(authorization, request, None, Some(num), None)
.await?;
if let Some(err) = response.error {

@ -773,13 +773,15 @@ impl Web3Rpc {
new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
// TODO: how many requests should we require in order to skip a health check?
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
let head_block = rpc.head_block.read().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| {
let block = x.block;
// let block = x.block;
let block = x.block.clone();
let block_number = block.number?;
let txid = block.transactions.last().cloned()?;

@ -1,7 +1,11 @@
use super::StatType;
use crate::{
app::Web3ProxyApp, frontend::errors::FrontendErrorResponse,
http_params::get_user_id_from_params,
app::Web3ProxyApp,
frontend::errors::FrontendErrorResponse,
http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params,
},
};
use anyhow::Context;
use axum::{
@ -9,7 +13,12 @@ use axum::{
response::Response,
TypedHeader,
};
use chrono::{DateTime, FixedOffset};
use fstrings::{f, format_args_f};
use hashbrown::HashMap;
use influxdb2::models::Query;
use influxdb2::{Client, FromDataPoint};
use serde_json::json;
pub async fn query_user_stats<'a>(
app: &'a Web3ProxyApp,
@ -37,5 +46,55 @@ pub async fn query_user_stats<'a>(
let user_id =
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
let query_window_seconds = get_query_window_seconds_from_params(params)?;
let query_start = get_query_start_from_params(params)?.timestamp();
let query_stop = get_query_stop_from_params(params)?.timestamp();
let chain_id = get_chain_id_from_params(app, params)?;
let page = get_page_from_params(params)?;
let measurement = if user_id == 0 {
"global_proxy"
} else {
"opt_in_proxy"
};
let bucket = "web3_proxy";
let mut group_columns = vec!["_measurement", "_field"];
let mut filter_chain_id = "".to_string();
if chain_id == 0 {
group_columns.push("chain_id");
} else {
filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#);
}
let group_columns = serde_json::to_string(&json!(group_columns)).unwrap();
let group = match stat_response_type {
StatType::Aggregated => f!(r#"|> group(columns: {group_columns})"#),
StatType::Detailed => "".to_string(),
};
let filter_field = match stat_response_type {
StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#),
StatType::Detailed => "".to_string(),
};
let query = f!(r#"
from(bucket: "{bucket}")
|> range(start: {query_start}, stop: {query_stop})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
{filter_field}
{filter_chain_id}
{group}
|> aggregateWindow(every: {query_window_seconds}, fn: mean, createEmpty: false)
|> yield(name: "mean")
"#);
let query = Query::new(query);
// let res: Vec<_> = influxdb_client.query(Some(query)).await?;
todo!();
}

@ -1,5 +1,5 @@
//! Store "stats" in a database for billing and a different database for graphing
//!
//!
//! TODO: move some of these structs/functions into their own file?
pub mod db_queries;
pub mod influxdb_queries;