From 25d34da98de82e7cb00c02e86609c3f3aa0dc961 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 3 Oct 2022 18:08:01 +0000 Subject: [PATCH] add stat emitter --- Cargo.lock | 46 +++++++++++++++++++---- TODO.md | 4 +- entities/Cargo.toml | 2 +- migration/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 3 +- web3_proxy/src/app.rs | 61 ++++++++++++++++++++++-------- web3_proxy/src/bin/web3_proxy.rs | 2 +- web3_proxy/src/config.rs | 2 + web3_proxy/src/lib.rs | 1 + web3_proxy/src/stats.rs | 64 ++++++++++++++++++++++++++++++++ 10 files changed, 157 insertions(+), 30 deletions(-) create mode 100644 web3_proxy/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index c90a3d8f..1d4551f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2479,6 +2479,35 @@ dependencies = [ "regex", ] +[[package]] +name = "influxdb" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39023407f0546c3b30607950f8b600c7db4ef7621fbaa0159de733d73e68b23f" +dependencies = [ + "chrono", + "futures-util", + "http", + "influxdb_derive", + "lazy_static", + "regex", + "reqwest", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "influxdb_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d81efbf97cec06c647f05a8b5edcbc52434cdf980d8d4ace68e1028c90241d3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "inotify" version = "0.9.6" @@ -4053,9 +4082,9 @@ dependencies = [ [[package]] name = "sea-orm" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "089dcca5d2c8393f5f21c7f7b0d84062839c3264ba62dcc0043eb207862e13a1" +checksum = "84c7282fc3d7f79f6c5bd57e603319862fc778bf74118c0ce2a0dc82d9141dee" dependencies = [ "async-stream", "async-trait", @@ -4080,9 +4109,9 @@ dependencies = [ [[package]] name = "sea-orm-cli" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cefd2d8878bd7e8b7313f036725fa3d08585d101fb1bf3adca7fc13f553f906" +checksum = "de07b4a0fc83b1b7ef8a2fe5d42c2662b1c60315446e6ebb4151a301c35fe484" dependencies = [ "async-std", "chrono", @@ -4097,9 +4126,9 @@ dependencies = [ [[package]] name = "sea-orm-macros" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d9708f945febe8625ccb0334654b97d1a1a0bffa0473d86e6108ad13f79bd5" +checksum = "f96b8a479d25d8110751a0511265556dd9139bc11e342357a98e60910fbb07e3" dependencies = [ "bae", "heck 0.3.3", @@ -4110,9 +4139,9 @@ dependencies = [ [[package]] name = "sea-orm-migration" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70a28587780fbae5c414a62bf0b32405f9da2e000d94f426abf214b2b2e68631" +checksum = "f80e3ebbc654c1915686898de119d33a449a9512567009df0a3e95b1afe4e36c" dependencies = [ "async-trait", "clap", @@ -5525,6 +5554,7 @@ dependencies = [ "handlebars", "hashbrown", "http", + "influxdb", "ipnet", "metered", "migration", diff --git a/TODO.md b/TODO.md index 20a64880..3ff9545c 100644 --- a/TODO.md +++ b/TODO.md @@ -174,8 +174,8 @@ These are roughly in order of completition - [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet. - [x] instead of giving a rate limit error code, delay the connection's response at the start. reject if incoming requests is super high? - [x] did this by checking a key/ip-specific semaphore before checking rate limits -- [ ] collect active requests per second per api key -- [ ] collect parallel requests per api key +- [ ] collect requests per second per api key +- [ ] collect concurrent requests per api key - [ ] collect distribution of methods per api key (eth_call, eth_getLogs, etc.) - [ ] display key stats on an endpoint that requires authentication - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 5e1da31f..f37d5d67 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -10,6 +10,6 @@ path = "src/mod.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -sea-orm = "0.9.2" +sea-orm = "0.9.3" serde = "1.0.145" uuid = "1.1.2" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 535281a2..fc3968f6 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -12,7 +12,7 @@ path = "src/lib.rs" async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } [dependencies.sea-orm-migration] -version = "0.9.2" +version = "0.9.3" features = [ # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a6ef3acd..c89f875e 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -36,6 +36,7 @@ flume = "0.10.14" futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } http = "0.2.8" +influxdb = { version = "0.5.2", features = ["derive"] } ipnet = "2.5.0" metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.4", default-features = false, features = ["future"] } @@ -53,7 +54,7 @@ reqwest = { version = "0.11.12", default-features = false, features = ["json", " handlebars = "4.3.4" rustc-hash = "1.1.0" siwe = "0.4.2" -sea-orm = { version = "0.9.2", features = ["macros"] } +sea-orm = { version = "0.9.3", features = ["macros"] } serde = { version = "1.0.145", features = [] } serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.1.6" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index c1f0e08c..76c804f7 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -11,6 +11,7 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; +use crate::stats::StatEmitter; use anyhow::Context; use axum::extract::ws::Message; use axum::headers::{Referer, UserAgent}; @@ -168,6 +169,7 @@ pub async fn get_migrated_db( #[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)] impl Web3ProxyApp { + /// The main entrypoint. pub async fn spawn( top_config: TopConfig, num_workers: usize, @@ -181,10 +183,11 @@ impl Web3ProxyApp { "redirect user url must contain \"{{user_id}}\"" ); + // setup metrics let app_metrics = Default::default(); let open_request_handle_metrics: Arc = Default::default(); - // first, we connect to mysql and make sure the latest migrations have run + // connect to mysql and make sure the latest migrations have run let db_conn = if let Some(db_url) = top_config.app.db_url.clone() { let db_min_connections = top_config .app @@ -192,12 +195,12 @@ impl Web3ProxyApp { .unwrap_or(num_workers as u32); // TODO: what default multiple? - let redis_max_connections = top_config + let db_max_connections = top_config .app .db_max_connections .unwrap_or(db_min_connections * 2); - let db = get_migrated_db(db_url, db_min_connections, redis_max_connections).await?; + let db = get_migrated_db(db_url, db_min_connections, db_max_connections).await?; Some(db) } else { @@ -206,19 +209,14 @@ impl Web3ProxyApp { }; let balanced_rpcs = top_config.balanced_rpcs; - - let private_rpcs = if let Some(private_rpcs) = top_config.private_rpcs { - private_rpcs - } else { - Default::default() - }; + let private_rpcs = top_config.private_rpcs.unwrap_or_default(); // TODO: try_join_all instead? let handles = FuturesUnordered::new(); // make a http shared client // TODO: can we configure the connection pool? should we? - // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server + // TODO: timeouts from config. defaults are hopefully good let http_client = Some( reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(5)) @@ -227,6 +225,8 @@ impl Web3ProxyApp { .build()?, ); + // create a connection pool for redis + // a failure to connect does NOT block the application from starting let redis_pool = match top_config.app.redis_url.as_ref() { Some(redis_url) => { // TODO: scrub credentials and then include the redis_url in logs @@ -261,13 +261,36 @@ impl Web3ProxyApp { } }; + // setup a channel here for receiving influxdb stats + // we do this in a channel so we don't slow down our response to the users + // TODO: make influxdb optional + let stat_sender = if let Some(influxdb_url) = top_config.app.influxdb_url.clone() { + let influxdb_name = top_config + .app + .influxdb_name + .clone() + .context("connecting to influxdb")?; + + // TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats + let (stat_sender, stat_handle) = + StatEmitter::spawn(influxdb_url, influxdb_name, http_client.clone()); + + handles.push(stat_handle); + + Some(stat_sender) + } else { + warn!("no influxdb connection"); + + None + }; + // TODO: i don't like doing Block::default here! Change this to "None"? let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default())); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); // TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that - // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them + // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them? // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. // TODO: we should still have some sort of expiration or maximum size limit for the map drop(pending_tx_receiver); @@ -287,6 +310,7 @@ impl Web3ProxyApp { .weigher(|_k, v| size_of_val(v) as u32) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + // connect to the load balanced rpcs let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( top_config.app.chain_id, balanced_rpcs, @@ -301,16 +325,18 @@ impl Web3ProxyApp { open_request_handle_metrics.clone(), ) .await - .context("balanced rpcs")?; + .context("spawning balanced rpcs")?; + // save the handle to catch any errors handles.push(balanced_handle); + // connect to the private rpcs + // only some chains have this, so this is optional let private_rpcs = if private_rpcs.is_empty() { // TODO: do None instead of clone? warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); None } else { - // TODO: attach context to this error let (private_rpcs, private_handle) = Web3Connections::spawn( top_config.app.chain_id, private_rpcs, @@ -328,15 +354,16 @@ impl Web3ProxyApp { open_request_handle_metrics.clone(), ) .await - .context("private_rpcs")?; + .context("spawning private_rpcs")?; + // save the handle to catch any errors handles.push(private_handle); Some(private_rpcs) }; - // TODO: setup a channel here for receiving influxdb stats - + // create rate limiters + // these are optional. they require redis let mut frontend_ip_rate_limiter = None; let mut frontend_key_rate_limiter = None; let mut login_rate_limiter = None; @@ -382,6 +409,7 @@ impl Web3ProxyApp { .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); // all the users are the same size, so no need for a weigher + // if there is no database of users, there will be no keys and so this will be empty // TODO: max_capacity from config // TODO: ttl from config let user_key_cache = Cache::builder() @@ -389,6 +417,7 @@ impl Web3ProxyApp { .time_to_live(Duration::from_secs(60)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + // create semaphores for concurrent connection limits // TODO: what should tti be for semaphores? let user_key_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 64dcad14..521c06c1 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -78,7 +78,7 @@ fn run( let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port)); // if everything is working, these should both run forever - // TODO: try_join these instead? use signal_shutdown here? + // TODO: try_join these instead? tokio::select! { x = app_handle => { match x { diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index f420806f..5f60e0a3 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -45,6 +45,7 @@ pub struct TopConfig { } /// shared configuration between Web3Connections +// TODO: no String, only &str #[derive(Debug, Default, Deserialize)] pub struct AppConfig { // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 @@ -59,6 +60,7 @@ pub struct AppConfig { /// If none, the minimum * 2 is used pub db_max_connections: Option, pub influxdb_url: Option, + pub influxdb_name: Option, pub default_requests_per_minute: Option, pub invite_code: Option, #[serde(default = "default_min_sum_soft_limit")] diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 3b78fc7f..fedf565c 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -6,3 +6,4 @@ pub mod jsonrpc; pub mod metered; pub mod metrics_frontend; pub mod rpcs; +pub mod stats; diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs new file mode 100644 index 00000000..6b39d30e --- /dev/null +++ b/web3_proxy/src/stats.rs @@ -0,0 +1,64 @@ +use chrono::{DateTime, Utc}; +use influxdb::InfluxDbWriteable; +use influxdb::{Client, Query, ReadQuery, Timestamp}; +use tokio::task::JoinHandle; +use tracing::{error, info}; + +/// TODO: replace this example stat with our own +#[derive(InfluxDbWriteable)] +pub struct WeatherReading { + time: DateTime, + humidity: i32, + #[influxdb(tag)] + wind_direction: String, +} + +pub enum Web3ProxyStat { + WeatherReading(WeatherReading), +} + +impl Web3ProxyStat { + fn into_query(self) -> influxdb::WriteQuery { + match self { + Self::WeatherReading(x) => x.into_query("weather"), + } + } +} + +pub struct StatEmitter; + +impl StatEmitter { + pub fn spawn( + influxdb_url: String, + influxdb_name: String, + http_client: Option, + ) -> (flume::Sender, JoinHandle>) { + let (tx, rx) = flume::unbounded::(); + + let client = Client::new(influxdb_url, influxdb_name); + + // use an existing http client + let client = if let Some(http_client) = http_client { + client.with_http_client(http_client) + } else { + client + }; + + let f = async move { + while let Ok(x) = rx.recv_async().await { + if let Err(err) = client.query(x.into_query()).await { + error!(?err, "failed writing stat"); + // TODO: now what? + } + } + + info!("stat emitter exited"); + + Ok(()) + }; + + let handle = tokio::spawn(f); + + (tx, handle) + } +}