From 0bb3a2dc0612433dea5000028106e7a377261e9c Mon Sep 17 00:00:00 2001 From: yenicelik Date: Fri, 31 Mar 2023 12:43:41 +0100 Subject: [PATCH] simple changes around bryans comments --- .cargo/config.toml | 1 - scripts/generate-requests-and-stats.sh | 1 + web3_proxy/src/app/mod.rs | 7 +-- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 60 ++++--------------- web3_proxy/src/frontend/authorization.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 5 -- web3_proxy/src/rpcs/many.rs | 33 ++-------- web3_proxy/src/rpcs/one.rs | 20 ++----- web3_proxy/src/rpcs/request.rs | 6 +- 9 files changed, 33 insertions(+), 102 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 1ebaa03c..f4ad2dbf 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,7 +1,6 @@ [build] rustflags = [ # potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html - # TODO: we might want to disable this so its easier to run the proxy across different aws instance types "-C", "target-cpu=native", # tokio unstable is needed for tokio-console "--cfg", "tokio_unstable" diff --git a/scripts/generate-requests-and-stats.sh b/scripts/generate-requests-and-stats.sh index 58cdf10b..ecae4466 100644 --- a/scripts/generate-requests-and-stats.sh +++ b/scripts/generate-requests-and-stats.sh @@ -1,4 +1,5 @@ # Got eth spam from here +# https://github.com/shazow/ethspam # Got versus from here # https://github.com/INFURA/versus diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 51b17c25..71fc42d8 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -407,7 +407,7 @@ impl Web3ProxyApp { num_workers: usize, shutdown_sender: broadcast::Sender<()>, ) -> anyhow::Result { - let rpc_account_shutdown_recevier = shutdown_sender.subscribe(); + let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); // safety checks on the config @@ -587,7 +587,7 @@ impl Web3ProxyApp { 60, 1, BILLING_PERIOD_SECONDS, - rpc_account_shutdown_recevier, + stat_buffer_shutdown_receiver, )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting important_background_handles.push(emitter_spawn.background_handle); @@ -711,9 +711,6 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // prepare a Web3Rpcs to hold all our balanced connections - // let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn( - // connect to the load balanced rpcs let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( top_config.app.chain_id, db_conn.clone(), diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 385a6aa0..41d880b8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; use log::{debug, error, info, trace, warn}; +use migration::sea_orm::QueryOrder; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, UpdateResult, @@ -45,25 +46,18 @@ fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { .context("Could not add duration since epoch for updated time") } -/// change a user's address. #[derive(FromArgs, PartialEq, Eq, Debug)] +/// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting #[argh(subcommand, name = "migrate_stats_to_v2")] pub struct MigrateStatsToV2 {} -// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think. -// Don't make data lossy - impl MigrateStatsToV2 { pub async fn main( self, top_config: TopConfig, db_conn: &DatabaseConnection, ) -> anyhow::Result<()> { - // Also add influxdb container ... - // let mut spawned_app = - // Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?; - - let number_of_rows_to_process_at_once = 500; + let number_of_rows_to_process_at_once = 2000; // we wouldn't really need this, but let's spawn this anyways // easier than debugging the rest I suppose @@ -121,31 +115,26 @@ impl MigrateStatsToV2 { None }; - // Basically spawn the full app, look at web3_proxy CLI + let migration_timestamp = chrono::offset::Utc::now(); - while true { + // Iterate over rows that were not market as "migrated" yet and process them + loop { // (1) Load a batch of rows out of the old table until no more rows are left let old_records = rpc_accounting::Entity::find() .filter(rpc_accounting::Column::Migrated.is_null()) .limit(number_of_rows_to_process_at_once) + .order_by_asc(rpc_accounting::Column::Id) .all(db_conn) .await?; if old_records.len() == 0 { // Break out of while loop once all records have successfully been migrated ... - warn!("All records seem to have been successfully migrated!"); + info!("All records seem to have been successfully migrated!"); break; } // (2) Create request metadata objects to match the old data // Iterate through all old rows, and put them into the above objects. for x in old_records.iter() { - // info!("Preparing for migration: {:?}", x); - - // TODO: Split up a single request into multiple requests ... - // according to frontend-requests, backend-requests, etc. - - // Get the rpc-key from the rpc_key_id - // Get the user-id from the rpc_key_id let authorization_checks = match x.rpc_key_id { Some(rpc_key_id) => { let rpc_key_obj = rpc_key::Entity::find() @@ -166,13 +155,10 @@ impl MigrateStatsToV2 { ..Default::default() } } - None => AuthorizationChecks { - ..Default::default() - }, + None => Default::default(), }; - // Then overwrite rpc_key_id and user_id (?) - let authorization_type = AuthorizationType::Frontend; + let authorization_type = AuthorizationType::Internal; let authorization = Arc::new( Authorization::try_new( authorization_checks, @@ -246,7 +232,7 @@ impl MigrateStatsToV2 { // Modify the timestamps .. response_stat.modify_struct( int_response_millis, - x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database + x.period_datetime.timestamp(), int_backend_requests, ); // info!("Sending stats: {:?}", response_stat); @@ -255,18 +241,10 @@ impl MigrateStatsToV2 { .send_async(response_stat.into()) .await .context("stat_sender sending response_stat")?; - // info!("Send! {:?}", stat_sender); } else { panic!("Stat sender was not spawned!"); } - - // Create a new stats object - // Add it to the StatBuffer } - - // Let the stat_sender spawn / flush ... - // spawned_app.app.stat_sender.aggregate_and_save_loop() - // Send a signal to save ... } // (3) Await that all items are properly processed @@ -284,7 +262,7 @@ impl MigrateStatsToV2 { .col_expr( rpc_accounting::Column::Migrated, Expr::value(Value::ChronoDateTimeUtc(Some(Box::new( - chrono::offset::Utc::now(), + migration_timestamp, )))), ) .filter(rpc_accounting::Column::Id.is_in(old_record_ids)) @@ -293,9 +271,6 @@ impl MigrateStatsToV2 { .await?; info!("Update result is: {:?}", update_result); - - // (N-1) Mark the batch as migrated - // break; } info!( @@ -303,12 +278,6 @@ impl MigrateStatsToV2 { important_background_handles ); - // Drop the handle - // Send the shutdown signal here (?) - // important_background_handles.clear(); - - // Finally also send a shutdown signal - drop(stat_sender); // match app_shutdown_sender.send(()) { // Err(x) => { @@ -317,8 +286,6 @@ impl MigrateStatsToV2 { // _ => {} // }; - // TODO: Should we also write a short verifier if the migration was successful (?) - // Drop the background handle, wait for any tasks that are on-going while let Some(x) = important_background_handles.next().await { info!("Returned item is: {:?}", x); @@ -337,9 +304,6 @@ impl MigrateStatsToV2 { } } } - - // info!("Here (?)"); - Ok(()) } } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 4ab6d66f..39577c62 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -613,7 +613,7 @@ impl Web3ProxyApp { proxy_mode: ProxyMode, ) -> anyhow::Result { // ip rate limits don't check referer or user agent - // the do check origin because we can override rate limits for some origins + // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( allowed_origin_requests_per_period, self.db_conn.clone(), diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 3771e4dd..4544386f 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -102,8 +102,6 @@ impl Web3ProxyBlock { if block_timestamp < now { // this server is still syncing from too far away to serve requests - // u64 is safe because ew checked equality above - // (now - block_timestamp).as_secs() // u64 is safe because we checked equality above (now - block_timestamp) as u64 } else { @@ -346,9 +344,6 @@ impl Web3Rpcs { let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); let request: JsonRpcRequest = serde_json::from_value(request)?; - // TODO: if error, retry? - // TODO: request_metadata or authorization? - // we don't actually set min_block_needed here because all nodes have all blocks let response = self .try_send_best_consensus_head_connection(authorization, request, None, Some(num), None) .await?; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 16295e25..0f6f5ffb 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -282,10 +282,6 @@ impl Web3Rpcs { }) .collect(); - // map of connection names to their connection - // let mut connections = HashMap::new(); - // let mut handles = vec![]; - while let Some(x) = spawn_handles.next().await { match x { Ok(Ok((rpc, _handle))) => { @@ -366,12 +362,8 @@ impl Web3Rpcs { self.by_name.read().len() } - // <<<<<<< HEAD pub fn is_empty(&self) -> bool { self.by_name.read().is_empty() - // ======= - // Ok((connections, handle, consensus_connections_watcher)) - // >>>>>>> 77df3fa (stats v2) } pub fn min_head_rpcs(&self) -> usize { @@ -888,11 +880,7 @@ impl Web3Rpcs { // TODO: maximum retries? right now its the total number of servers loop { - // <<<<<<< HEAD if skip_rpcs.len() >= self.by_name.read().len() { - // ======= - // if skip_rpcs.len() == self.by_name.len() { - // >>>>>>> 77df3fa (stats v2) break; } @@ -1173,18 +1161,8 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - // <<<<<<< HEAD watch_consensus_rpcs.changed().await?; - watch_consensus_rpcs.borrow_and_update(); - // ======= - // TODO: i don't think this will ever happen - // TODO: return a 502? if it does? - // return Err(anyhow::anyhow!("no available rpcs!")); - // TODO: sleep how long? - // TODO: subscribe to something in ConsensusWeb3Rpcs instead - sleep(Duration::from_millis(200)).await; - // >>>>>>> 77df3fa (stats v2) continue; } @@ -1285,11 +1263,12 @@ impl Serialize for Web3Rpcs { /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { - let head_block = x.head_block - .read() - .as_ref() - .map(|x| *x.number()) - .unwrap_or_default(); + let head_block = x + .head_block + .read() + .as_ref() + .map(|x| *x.number()) + .unwrap_or_default(); let tier = x.tier; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 5b6322aa..03cf6834 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -9,9 +9,9 @@ use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; -use futures::StreamExt; use futures::future::try_join_all; use futures::stream::FuturesUnordered; +use futures::StreamExt; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; @@ -243,12 +243,9 @@ impl Web3Rpc { block_data_limit, reconnect, tier: config.tier, -// <<<<<<< HEAD disconnect_watch: Some(disconnect_sender), created_at: Some(created_at), -// ======= head_block: RwLock::new(Default::default()), -// >>>>>>> 77df3fa (stats v2) ..Default::default() }; @@ -723,7 +720,7 @@ impl Web3Rpc { } else { RequestErrorHandler::ErrorLevel }; - + let mut delay_start = false; // this does loop. just only when reconnect is enabled @@ -788,7 +785,6 @@ impl Web3Rpc { let head_block = rpc.head_block.read().clone(); if let Some((block_number, txid)) = head_block.and_then(|x| { - // let block = x.block; let block = x.block.clone(); let block_number = block.number?; @@ -912,7 +908,7 @@ impl Web3Rpc { continue; } - + // reconnect is not enabled. if *disconnect_receiver.borrow() { info!("{} is disconnecting", self); @@ -1174,7 +1170,9 @@ impl Web3Rpc { if self.should_disconnect() { Ok(()) } else { - Err(anyhow!("pending_transactions subscription exited. reconnect needed")) + Err(anyhow!( + "pending_transactions subscription exited. reconnect needed" + )) } } @@ -1251,14 +1249,8 @@ impl Web3Rpc { } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { -// <<<<<<< HEAD let hard_limit_ready = *hard_limit_until.borrow(); -// ======= -// let hard_limit_ready = hard_limit_until.borrow().to_owned(); -// >>>>>>> 77df3fa (stats v2) - let now = Instant::now(); - if now < hard_limit_ready { return Ok(OpenRequestResult::RetryAt(hard_limit_ready)); } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index e713cc10..8b9b4da0 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -198,7 +198,11 @@ impl OpenRequestHandle { // TODO: replace ethers-rs providers with our own that supports streaming the responses let response = match provider.as_ref() { #[cfg(test)] - Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())), + Web3Provider::Mock => { + return Err(ProviderError::CustomError( + "mock provider can't respond".to_string(), + )) + } Web3Provider::Ws(p) => p.request(method, params).await, Web3Provider::Http(p) | Web3Provider::Both(p, _) => { // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks