diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index a615e6ff..70c79ab3 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -40,6 +40,7 @@ pub struct Model { pub max_response_bytes: u64, pub archive_request: bool, pub origin: Option, + pub migrated: Option } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/scripts/manual-tests/21-sql-migration-make-backup.sh b/scripts/manual-tests/21-sql-migration-make-backup.sh new file mode 100644 index 00000000..4a43d5a3 --- /dev/null +++ b/scripts/manual-tests/21-sql-migration-make-backup.sh @@ -0,0 +1 @@ +mysqldump -u root --password=dev_web3_proxy -h 127.0.0.1 --port 13306 diff --git a/scripts/manual-tests/21-sql-migration-verify-test-queries.sql b/scripts/manual-tests/21-sql-migration-verify-test-queries.sql new file mode 100644 index 00000000..248e4f5e --- /dev/null +++ b/scripts/manual-tests/21-sql-migration-verify-test-queries.sql @@ -0,0 +1,17 @@ +SELECT COUNT(*) FROM rpc_accounting WHERE migrated IS NULL; +UPDATE rpc_accounting SET migrated = NULL; + +SELECT SUM(frontend_requests) FROM rpc_accounting; +SELECT SUM(frontend_requests) FROM rpc_accounting_v2; + +SELECT SUM(backend_requests) FROM rpc_accounting; +SELECT SUM(backend_requests) FROM rpc_accounting_v2; + +SELECT SUM(sum_request_bytes) FROM rpc_accounting; +SELECT SUM(sum_request_bytes) FROM rpc_accounting_v2; + +SELECT SUM(sum_response_millis) FROM rpc_accounting; +SELECT SUM(sum_response_millis) FROM rpc_accounting_v2; + +SELECT SUM(sum_response_bytes) FROM rpc_accounting; +SELECT SUM(sum_response_bytes) FROM rpc_accounting_v2; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 3834eb2a..1c32810c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -67,7 +67,7 @@ pub static APP_USER_AGENT: &str = concat!( ); // aggregate across 1 week -const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; +pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; #[derive(Debug, From)] struct ResponseCacheKey { @@ -575,7 +575,12 @@ impl Web3ProxyApp { // stats can be saved in mysql, influxdb, both, or none let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( top_config.app.chain_id, - top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(), + top_config + .app + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")? + .to_owned(), db_conn.clone(), influxdb_client.clone(), 60, @@ -812,26 +817,27 @@ impl Web3ProxyApp { app_handles.push(config_handle); } -// ======= -// if important_background_handles.is_empty() { -// info!("no important background handles"); -// -// let f = tokio::spawn(async move { -// let _ = background_shutdown_receiver.recv().await; -// -// Ok(()) -// }); -// -// important_background_handles.push(f); -// >>>>>>> 77df3fa (stats v2) + // ======= + // if important_background_handles.is_empty() { + // info!("no important background handles"); + // + // let f = tokio::spawn(async move { + // let _ = background_shutdown_receiver.recv().await; + // + // Ok(()) + // }); + // + // important_background_handles.push(f); + // >>>>>>> 77df3fa (stats v2) Ok(( app, app_handles, important_background_handles, new_top_config_sender, - consensus_connections_watcher - ).into()) + consensus_connections_watcher, + ) + .into()) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { @@ -1793,7 +1799,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - method.to_string(), + Some(method.to_string()), authorization.clone(), request_metadata, response.num_bytes(), @@ -1816,7 +1822,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - request_method, + Some(request_method), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index b69cdcc9..037e560b 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -96,7 +96,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newHeads)".to_string(), + Some("eth_subscription(newHeads)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -167,7 +167,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingTransactions)".to_string(), + Some("eth_subscription(newPendingTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -243,7 +243,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingFullTransactions)".to_string(), + Some("eth_subscription(newPendingFullTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -319,7 +319,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingRawTransactions)".to_string(), + Some("eth_subscription(newPendingRawTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -350,7 +350,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - request_json.method.clone(), + Some(request_json.method.clone()), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 88b64a0e..7cbeef83 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -374,12 +374,17 @@ fn main() -> anyhow::Result<()> { x.main(&db_conn).await } SubCommand::MigrateStatsToV2(x) => { + + let top_config = top_config.expect("--config is required to run the migration from stats-mysql to stats-influx"); + // let top_config_path = + // top_config_path.expect("path must be set if top_config exists"); + let db_url = cli_config .db_url .expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx"); let db_conn = get_db(db_url, 1, 1).await?; - x.main(&db_conn).await + x.main(top_config, &db_conn).await } SubCommand::Pagerduty(x) => { if cli_config.sentry_url.is_none() { diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 88d19dde..385a6aa0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,27 +1,135 @@ use anyhow::Context; use argh::FromArgs; -use entities::{rpc_accounting, rpc_accounting_v2, user}; +use chrono::{DateTime, Utc}; +use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user}; use ethers::types::Address; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use hashbrown::HashMap; -use log::{debug, info, warn}; +use log::{debug, error, info, trace, warn}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, QuerySelect + QueryFilter, QuerySelect, UpdateResult, }; -use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; +use migration::{Expr, Value}; +use std::mem::swap; +use std::net::{IpAddr, Ipv4Addr}; +use std::num::NonZeroU64; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::broadcast; +use tokio::time::{sleep, Instant}; +use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS}; +use web3_proxy::config::TopConfig; +use web3_proxy::frontend::authorization::{ + Authorization, AuthorizationType, RequestMetadata, RpcSecretKey, +}; +use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer}; + +// Helper function to go from DateTime to Instant +fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { + let epoch = datetime.timestamp(); // Get the Unix timestamp + let nanos = datetime.timestamp_subsec_nanos(); + + let duration_since_epoch = Duration::new(epoch as u64, nanos); + // let duration_since_datetime = Duration::new(, nanos); + let instant_new = Instant::now(); + warn!("Instant new is: {:?}", instant_new); + let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + warn!("Instant since unix epoch is: {:?}", unix_epoch); + + instant_new + .checked_sub(unix_epoch) + .context("Could not subtract unix epoch from instant now")? + .checked_add(duration_since_epoch) + .context("Could not add duration since epoch for updated time") +} /// change a user's address. #[derive(FromArgs, PartialEq, Eq, Debug)] #[argh(subcommand, name = "migrate_stats_to_v2")] pub struct MigrateStatsToV2 {} +// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think. +// Don't make data lossy + impl MigrateStatsToV2 { - pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { + pub async fn main( + self, + top_config: TopConfig, + db_conn: &DatabaseConnection, + ) -> anyhow::Result<()> { + // Also add influxdb container ... + // let mut spawned_app = + // Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?; + + let number_of_rows_to_process_at_once = 500; + + // we wouldn't really need this, but let's spawn this anyways + // easier than debugging the rest I suppose + let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); + let rpc_account_shutdown_recevier = app_shutdown_sender.subscribe(); + + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) + let mut important_background_handles = FuturesUnordered::new(); + + // Spawn the influxdb + let influxdb_client = match top_config.app.influxdb_host.as_ref() { + Some(influxdb_host) => { + let influxdb_org = top_config + .app + .influxdb_org + .clone() + .expect("influxdb_org needed when influxdb_host is set"); + let influxdb_token = top_config + .app + .influxdb_token + .clone() + .expect("influxdb_token needed when influxdb_host is set"); + + let influxdb_client = + influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token); + + // TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys + + Some(influxdb_client) + } + None => None, + }; + + // Spawn the stat-sender + let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( + top_config.app.chain_id, + top_config + .app + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")? + .to_owned(), + Some(db_conn.clone()), + influxdb_client.clone(), + 30, + 1, + BILLING_PERIOD_SECONDS, + rpc_account_shutdown_recevier, + )? { + // since the database entries are used for accounting, we want to be sure everything is saved before exiting + important_background_handles.push(emitter_spawn.background_handle); + + Some(emitter_spawn.stat_sender) + } else { + None + }; + + // Basically spawn the full app, look at web3_proxy CLI while true { - // (1) Load a batch of rows out of the old table until no more rows are left - let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?; + let old_records = rpc_accounting::Entity::find() + .filter(rpc_accounting::Column::Migrated.is_null()) + .limit(number_of_rows_to_process_at_once) + .all(db_conn) + .await?; if old_records.len() == 0 { // Break out of while loop once all records have successfully been migrated ... warn!("All records seem to have been successfully migrated!"); @@ -29,93 +137,208 @@ impl MigrateStatsToV2 { } // (2) Create request metadata objects to match the old data - let mut global_timeseries_buffer = HashMap::::new(); - let mut opt_in_timeseries_buffer = HashMap::::new(); - let mut accounting_db_buffer = HashMap::::new(); - // Iterate through all old rows, and put them into the above objects. - for x in old_records { + for x in old_records.iter() { + // info!("Preparing for migration: {:?}", x); - info!("Preparing for migration: {:?}", x); + // TODO: Split up a single request into multiple requests ... + // according to frontend-requests, backend-requests, etc. - // // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object - // let key = RpcQueryKey { - // response_timestamp: x.period_datetime.timestamp(), - // archive_needed: x.archive_needed, - // error_response: x.error_response, - // period_datetime: x.period_datetime.timestamp(), - // rpc_secret_key_id: x.rpc_key_id, - // origin: x.origin, - // method: x.method - // }; - // - // // Create the corresponding BufferedRpcQueryStats object - // let val = BufferedRpcQueryStats { - // frontend_requests: x.frontend_requests, - // backend_requests: x.backend_requests, - // backend_retries: x.backend_retries, - // no_servers: x.no_servers, - // cache_misses: x.cache_misses, - // cache_hits: x.cache_hits, - // sum_request_bytes: x.sum_request_bytes, - // sum_response_bytes: x.sum_response_bytes, - // sum_response_millis: x.sum_response_millis - // }; - // - // // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of! - // // We can generate dummy bytes of the same length though, this may work as well - // - // // TODO: Period datetime is also a question of what it is - // // let response_stat = RpcQueryStats::new( - // // x.method, - // // authorization.clone(), - // // request_metadata.clone(), - // // response_bytes, - // // x.period_datetime - // // ); - // - // // BufferedRpcQueryStats + // Get the rpc-key from the rpc_key_id + // Get the user-id from the rpc_key_id + let authorization_checks = match x.rpc_key_id { + Some(rpc_key_id) => { + let rpc_key_obj = rpc_key::Entity::find() + .filter(rpc_key::Column::Id.eq(rpc_key_id)) + .one(db_conn) + .await? + .context("Could not find rpc_key_obj for the given rpc_key_id")?; + + // TODO: Create authrization + // We can probably also randomly generate this, as we don't care about the user (?) + AuthorizationChecks { + user_id: rpc_key_obj.user_id, + rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)), + rpc_secret_key_id: Some( + NonZeroU64::new(rpc_key_id) + .context("Could not use rpc_key_id to create a u64")?, + ), + ..Default::default() + } + } + None => AuthorizationChecks { + ..Default::default() + }, + }; + + // Then overwrite rpc_key_id and user_id (?) + let authorization_type = AuthorizationType::Frontend; + let authorization = Arc::new( + Authorization::try_new( + authorization_checks, + None, + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + None, + None, + None, + authorization_type, + ) + .context("Initializing Authorization Struct was not successful")?, + ); + + // It will be like a fork basically (to simulate getting multiple single requests ...) + // Iterate through all frontend requests + // For each frontend request, create one object that will be emitted (make sure the timestamp is new) + let n = x.frontend_requests; + + for i in 0..n { + // info!("Creating a new frontend request"); + + // Collect all requests here ... + let mut int_request_bytes = (x.sum_request_bytes / n); + if i == 0 { + int_request_bytes += (x.sum_request_bytes % n); + } + + let mut int_response_bytes = (x.sum_response_bytes / n); + if i == 0 { + int_response_bytes += (x.sum_response_bytes % n); + } + + let mut int_response_millis = (x.sum_response_millis / n); + if i == 0 { + int_response_millis += (x.sum_response_millis % n); + } + + let mut int_backend_requests = (x.backend_requests / n); + if i == 0 { + int_backend_requests += (x.backend_requests % n); + } + + // Add module at the last step to include for any remained that we missed ... (?) + + // TODO: Create RequestMetadata + let request_metadata = RequestMetadata { + start_instant: Instant::now(), // This is overwritten later on + request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes + archive_request: x.archive_request.into(), + backend_requests: Default::default(), // This is not used, instead we modify the field later + no_servers: 0.into(), // This is not relevant in the new version + error_response: x.error_response.into(), + response_bytes: int_response_bytes.into(), + response_millis: int_response_millis.into(), + // We just don't have this data + response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default() + }; + + // (3) Send through a channel to a stat emitter + // Send it to the stats sender + if let Some(stat_sender_ref) = stat_sender.as_ref() { + // info!("Method is: {:?}", x.clone().method); + let mut response_stat = RpcQueryStats::new( + x.clone().method, + authorization.clone(), + Arc::new(request_metadata), + (int_response_bytes) + .try_into() + .context("sum bytes average is not calculated properly")?, + ); + // Modify the timestamps .. + response_stat.modify_struct( + int_response_millis, + x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database + int_backend_requests, + ); + // info!("Sending stats: {:?}", response_stat); + stat_sender_ref + // .send(response_stat.into()) + .send_async(response_stat.into()) + .await + .context("stat_sender sending response_stat")?; + // info!("Send! {:?}", stat_sender); + } else { + panic!("Stat sender was not spawned!"); + } + + // Create a new stats object + // Add it to the StatBuffer + } + + // Let the stat_sender spawn / flush ... + // spawned_app.app.stat_sender.aggregate_and_save_loop() + // Send a signal to save ... } + // (3) Await that all items are properly processed + // TODO: Await all the background handles + + // Only after this mark all the items as processed / completed + + // If the items are in rpc_v2, delete the initial items from the database + + // return Ok(()); + + // (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated) + let old_record_ids = old_records.iter().map(|x| x.id); + let update_result: UpdateResult = rpc_accounting::Entity::update_many() + .col_expr( + rpc_accounting::Column::Migrated, + Expr::value(Value::ChronoDateTimeUtc(Some(Box::new( + chrono::offset::Utc::now(), + )))), + ) + .filter(rpc_accounting::Column::Id.is_in(old_record_ids)) + // .set(pear) + .exec(db_conn) + .await?; + + info!("Update result is: {:?}", update_result); + + // (N-1) Mark the batch as migrated + // break; } + info!( + "Background handles (2) are: {:?}", + important_background_handles + ); + // Drop the handle + // Send the shutdown signal here (?) + // important_background_handles.clear(); + // Finally also send a shutdown signal + drop(stat_sender); + // match app_shutdown_sender.send(()) { + // Err(x) => { + // panic!("Could not send shutdown signal! {:?}", x); + // } + // _ => {} + // }; - // (3) Update the batch in the old table with the current timestamp + // TODO: Should we also write a short verifier if the migration was successful (?) - // (4) Send through a channel to a stat emitter + // Drop the background handle, wait for any tasks that are on-going + while let Some(x) = important_background_handles.next().await { + info!("Returned item is: {:?}", x); + match x { + Err(e) => { + error!("{:?}", e); + } + Ok(Err(e)) => { + error!("{:?}", e); + } + Ok(Ok(_)) => { + // TODO: how can we know which handle exited? + info!("a background handle exited"); + // Pop it in this case? + continue; + } + } + } - - - // let old_address: Address = self.old_address.parse()?; - // let new_address: Address = self.new_address.parse()?; - // - // let old_address: Vec = old_address.to_fixed_bytes().into(); - // let new_address: Vec = new_address.to_fixed_bytes().into(); - // - // let u = user::Entity::find() - // .filter(user::Column::Address.eq(old_address)) - // .one(db_conn) - // .await? - // .context("No user found with that address")?; - // - // debug!("initial user: {:#?}", u); - // - // if u.address == new_address { - // info!("user already has this address"); - // } else { - // let mut u = u.into_active_model(); - // - // u.address = sea_orm::Set(new_address); - // - // let u = u.save(db_conn).await?; - // - // info!("changed user address"); - // - // debug!("updated user: {:#?}", u); - // } + // info!("Here (?)"); Ok(()) } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 072ad854..968e5eca 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -400,7 +400,7 @@ async fn handle_socket_payload( if let Some(stat_sender) = app.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - json_request.method.clone(), + Some(json_request.method.clone()), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1a1b8354..0c2528a8 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -34,6 +34,7 @@ use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; +use tokio; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -167,7 +168,8 @@ impl Web3Rpcs { .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, consensus_connections_watcher) = + watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it let by_name = Default::default(); @@ -318,43 +320,43 @@ impl Web3Rpcs { } } -// <<<<<<< HEAD + // <<<<<<< HEAD Ok(()) } -// ======= -// // TODO: max_capacity and time_to_idle from config -// // all block hashes are the same size, so no need for weigher -// let block_hashes = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// // all block numbers are the same size, so no need for weigher -// let block_numbers = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// -// let (watch_consensus_connections_sender, consensus_connections_watcher) = -// watch::channel(Default::default()); -// -// let watch_consensus_head_receiver = -// watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); -// -// let connections = Arc::new(Self { -// by_name: connections, -// watch_consensus_rpcs_sender: watch_consensus_connections_sender, -// watch_consensus_head_receiver, -// pending_transactions, -// block_hashes, -// block_numbers, -// min_sum_soft_limit, -// min_head_rpcs, -// max_block_age, -// max_block_lag, -// }); -// -// let authorization = Arc::new(Authorization::internal(db_conn.clone())?); -// >>>>>>> 77df3fa (stats v2) + // ======= + // // TODO: max_capacity and time_to_idle from config + // // all block hashes are the same size, so no need for weigher + // let block_hashes = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // // all block numbers are the same size, so no need for weigher + // let block_numbers = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // + // let (watch_consensus_connections_sender, consensus_connections_watcher) = + // watch::channel(Default::default()); + // + // let watch_consensus_head_receiver = + // watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + // + // let connections = Arc::new(Self { + // by_name: connections, + // watch_consensus_rpcs_sender: watch_consensus_connections_sender, + // watch_consensus_head_receiver, + // pending_transactions, + // block_hashes, + // block_numbers, + // min_sum_soft_limit, + // min_head_rpcs, + // max_block_age, + // max_block_lag, + // }); + // + // let authorization = Arc::new(Authorization::internal(db_conn.clone())?); + // >>>>>>> 77df3fa (stats v2) pub fn get(&self, conn_name: &str) -> Option> { self.by_name.read().get(conn_name).cloned() @@ -364,12 +366,12 @@ impl Web3Rpcs { self.by_name.read().len() } -// <<<<<<< HEAD + // <<<<<<< HEAD pub fn is_empty(&self) -> bool { self.by_name.read().is_empty() -// ======= -// Ok((connections, handle, consensus_connections_watcher)) -// >>>>>>> 77df3fa (stats v2) + // ======= + // Ok((connections, handle, consensus_connections_watcher)) + // >>>>>>> 77df3fa (stats v2) } pub fn min_head_rpcs(&self) -> usize { @@ -395,7 +397,7 @@ impl Web3Rpcs { let clone = self.clone(); let authorization = authorization.clone(); let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); - let handle = task::spawn(async move { + let handle = tokio::task::spawn(async move { // TODO: set up this future the same as the block funnel while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { let f = clone.clone().process_incoming_tx_id( @@ -418,7 +420,7 @@ impl Web3Rpcs { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); - let handle = task::Builder::default() + let handle = tokio::task::Builder::default() .name("process_incoming_blocks") .spawn(async move { connections @@ -432,12 +434,14 @@ impl Web3Rpcs { if futures.is_empty() { // no transaction or block subscriptions. - let handle = task::Builder::default().name("noop").spawn(async move { - loop { - sleep(Duration::from_secs(600)).await; - // TODO: "every interval, check that the provider is still connected" - } - })?; + let handle = tokio::task::Builder::default() + .name("noop") + .spawn(async move { + loop { + sleep(Duration::from_secs(600)).await; + // TODO: "every interval, check that the provider is still connected" + } + })?; futures.push(flatten_handle(handle)); } @@ -884,11 +888,11 @@ impl Web3Rpcs { // TODO: maximum retries? right now its the total number of servers loop { -// <<<<<<< HEAD + // <<<<<<< HEAD if skip_rpcs.len() >= self.by_name.read().len() { -// ======= -// if skip_rpcs.len() == self.by_name.len() { -// >>>>>>> 77df3fa (stats v2) + // ======= + // if skip_rpcs.len() == self.by_name.len() { + // >>>>>>> 77df3fa (stats v2) break; } @@ -1159,18 +1163,18 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } -// <<<<<<< HEAD + // <<<<<<< HEAD watch_consensus_rpcs.changed().await?; watch_consensus_rpcs.borrow_and_update(); -// ======= + // ======= // TODO: i don't think this will ever happen // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); // TODO: sleep how long? // TODO: subscribe to something in ConsensusWeb3Rpcs instead sleep(Duration::from_millis(200)).await; -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) continue; } @@ -1299,13 +1303,13 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; use super::*; use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; #[tokio::test] diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 99cb77df..097b57e2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -35,11 +35,12 @@ pub enum StatType { #[derive(Clone, Debug)] pub struct RpcQueryStats { pub authorization: Arc, - pub method: String, + pub method: Option, pub archive_request: bool, pub error_response: bool, pub request_bytes: u64, /// if backend_requests is 0, there was a cache_hit + // pub frontend_request: u64, pub backend_requests: u64, pub response_bytes: u64, pub response_millis: u64, @@ -96,7 +97,7 @@ impl RpcQueryStats { TrackingLevel::Detailed => { // detailed tracking keeps track of the method and origin // depending on the request, the origin might still be None - let method = Some(self.method.clone()); + let method = self.method.clone(); let origin = self.authorization.origin.clone(); (method, origin) @@ -116,7 +117,7 @@ impl RpcQueryStats { /// all queries are aggregated /// TODO: should we store "anon" or "registered" as a key just to be able to split graphs? fn global_timeseries_key(&self) -> RpcQueryKey { - let method = Some(self.method.clone()); + let method = self.method.clone(); // we don't store origin in the timeseries db. its only used for optional accounting let origin = None; // everyone gets grouped together @@ -140,7 +141,7 @@ impl RpcQueryStats { TrackingLevel::None => { // this RPC key requested no tracking. this is the default. // we still want graphs though, so we just use None as the rpc_secret_key_id - (Some(self.method.clone()), None) + (self.method.clone(), None) } TrackingLevel::Aggregated => { // this RPC key requested tracking aggregated across all methods @@ -149,7 +150,7 @@ impl RpcQueryStats { TrackingLevel::Detailed => { // detailed tracking keeps track of the method ( - Some(self.method.clone()), + self.method.clone(), self.authorization.checks.rpc_secret_key_id, ) } @@ -361,10 +362,10 @@ impl BufferedRpcQueryStats { impl RpcQueryStats { pub fn new( - method: String, + method: Option, authorization: Arc, metadata: Arc, - response_bytes: usize + response_bytes: usize, ) -> Self { // TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected // TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created @@ -391,6 +392,17 @@ impl RpcQueryStats { } } + /// Only used for migration from stats_v1 to stats_v2/v3 + pub fn modify_struct( + &mut self, + response_millis: u64, + response_timestamp: i64, + backend_requests: u64, + ) { + self.response_millis = response_millis; + self.response_timestamp = response_timestamp; + self.backend_requests = backend_requests; + } } impl StatBuffer { @@ -450,6 +462,7 @@ impl StatBuffer { loop { tokio::select! { stat = stat_receiver.recv_async() => { + // info!("Received stat"); // save the stat to a buffer match stat { Ok(AppStat::RpcQuery(stat)) => { @@ -476,6 +489,7 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { + // info!("DB save internal tick"); let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats"); // TODO: batch saves @@ -487,6 +501,7 @@ impl StatBuffer { } } _ = tsdb_save_interval.tick() => { + // info!("TSDB save internal tick"); // TODO: batch saves // TODO: better bucket names let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats"); @@ -506,6 +521,7 @@ impl StatBuffer { } } x = shutdown_receiver.recv() => { + info!("shutdown signal ---"); match x { Ok(_) => { info!("stat_loop shutting down"); @@ -544,13 +560,7 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { if let Err(err) = stat - .save_timeseries( - &bucket, - "global_proxy", - self.chain_id, - influxdb_client, - key, - ) + .save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key) .await { error!( @@ -567,13 +577,7 @@ impl StatBuffer { for (key, stat) in opt_in_timeseries_buffer.drain() { if let Err(err) = stat - .save_timeseries( - &bucket, - "opt_in_proxy", - self.chain_id, - influxdb_client, - key, - ) + .save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key) .await { error!(