diff --git a/migration/src/m20230125_204810_stats_v2.rs b/migration/src/m20230125_204810_stats_v2.rs index 223acabe..7082fec0 100644 --- a/migration/src/m20230125_204810_stats_v2.rs +++ b/migration/src/m20230125_204810_stats_v2.rs @@ -136,13 +136,6 @@ impl MigrationTrait for Migration { } } -/// Partial table definition -#[derive(Iden)] -pub enum RpcKey { - Table, - Id, -} - #[derive(Iden)] enum RpcAccountingV2 { Table, diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ed5ac64b..5576f0a6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -581,8 +581,7 @@ impl Web3ProxyApp { .app .influxdb_bucket .clone() - .context("No influxdb bucket was provided")? - .to_owned(), + .context("No influxdb bucket was provided")?, db_conn.clone(), influxdb_client.clone(), 60, diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 4a7a4cee..bdd8350f 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,50 +1,25 @@ use anyhow::Context; use argh::FromArgs; -use chrono::{DateTime, Utc}; -use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user}; -use ethers::types::Address; +use entities::{rpc_accounting, rpc_key}; use futures::stream::FuturesUnordered; use futures::StreamExt; -use hashbrown::HashMap; -use log::{debug, error, info, trace, warn}; +use log::{error, info}; use migration::sea_orm::QueryOrder; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, QuerySelect, UpdateResult, + ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, UpdateResult, }; use migration::{Expr, Value}; -use std::mem::swap; use std::net::{IpAddr, Ipv4Addr}; use std::num::NonZeroU64; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::sync::Arc; use tokio::sync::broadcast; -use tokio::time::{sleep, Instant}; -use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS}; +use tokio::time::Instant; +use web3_proxy::app::{AuthorizationChecks, BILLING_PERIOD_SECONDS}; use web3_proxy::config::TopConfig; use web3_proxy::frontend::authorization::{ Authorization, AuthorizationType, RequestMetadata, RpcSecretKey, }; -use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer}; - -// Helper function to go from DateTime to Instant -fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { - let epoch = datetime.timestamp(); // Get the Unix timestamp - let nanos = datetime.timestamp_subsec_nanos(); - - let duration_since_epoch = Duration::new(epoch as u64, nanos); - // let duration_since_datetime = Duration::new(, nanos); - let instant_new = Instant::now(); - warn!("Instant new is: {:?}", instant_new); - let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - warn!("Instant since unix epoch is: {:?}", unix_epoch); - - instant_new - .checked_sub(unix_epoch) - .context("Could not subtract unix epoch from instant now")? - .checked_add(duration_since_epoch) - .context("Could not add duration since epoch for updated time") -} +use web3_proxy::stats::{RpcQueryStats, StatBuffer}; #[derive(FromArgs, PartialEq, Eq, Debug)] /// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting @@ -98,8 +73,7 @@ impl MigrateStatsToV2 { .app .influxdb_bucket .clone() - .context("No influxdb bucket was provided")? - .to_owned(), + .context("No influxdb bucket was provided")?, Some(db_conn.clone()), influxdb_client.clone(), 30, @@ -126,7 +100,7 @@ impl MigrateStatsToV2 { .order_by_asc(rpc_accounting::Column::Id) .all(db_conn) .await?; - if old_records.len() == 0 { + if old_records.is_empty() { // Break out of while loop once all records have successfully been migrated ... info!("All records seem to have been successfully migrated!"); break; @@ -181,32 +155,32 @@ impl MigrateStatsToV2 { // info!("Creating a new frontend request"); // Collect all requests here ... - let mut int_request_bytes = (x.sum_request_bytes / n); + let mut int_request_bytes = x.sum_request_bytes / n; if i == 0 { - int_request_bytes += (x.sum_request_bytes % n); + int_request_bytes += x.sum_request_bytes % n; } - let mut int_response_bytes = (x.sum_response_bytes / n); + let mut int_response_bytes = x.sum_response_bytes / n; if i == 0 { - int_response_bytes += (x.sum_response_bytes % n); + int_response_bytes += x.sum_response_bytes % n; } - let mut int_response_millis = (x.sum_response_millis / n); + let mut int_response_millis = x.sum_response_millis / n; if i == 0 { - int_response_millis += (x.sum_response_millis % n); + int_response_millis += x.sum_response_millis % n; } - let mut int_backend_requests = (x.backend_requests / n); + let mut int_backend_requests = x.backend_requests / n; if i == 0 { - int_backend_requests += (x.backend_requests % n); + int_backend_requests += x.backend_requests % n; } // Add module at the last step to include for any remained that we missed ... (?) // TODO: Create RequestMetadata let request_metadata = RequestMetadata { - start_instant: Instant::now(), // This is overwritten later on - request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes + start_instant: Instant::now(), // This is overwritten later on + request_bytes: int_request_bytes, // Get the mean of all the request bytes archive_request: x.archive_request.into(), backend_requests: Default::default(), // This is not used, instead we modify the field later no_servers: 0.into(), // This is not relevant in the new version @@ -280,11 +254,8 @@ impl MigrateStatsToV2 { drop(stat_sender); - match app_shutdown_sender.send(()) { - Err(x) => { - panic!("Could not send shutdown signal! {:?}", x); - } - _ => {} + if let Err(x) = app_shutdown_sender.send(()) { + panic!("Could not send shutdown signal! {:?}", x); }; // Wait for any tasks that are on-going diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 4c74143c..b64d5fcb 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -7,8 +7,6 @@ use super::{FrontendHealthCache, FrontendJsonResponseCache, FrontendResponseCach use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_macros::debug_handler; -use hashbrown::HashMap; -use http::HeaderMap; use serde_json::json; use std::sync::Arc; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 39271ee0..bcc6e0fb 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -14,7 +14,6 @@ use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; use std::hash::Hash; -use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; use tokio::time::Duration; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 6f3e6a2b..54eb2c1d 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -2,11 +2,11 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::frontend::authorization::Authorization; -use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{debug, trace, warn}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index cc49561b..4c86f2a6 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -37,7 +37,6 @@ use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; use tokio; use tokio::sync::{broadcast, watch}; -use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; /// A collection of web3 connections. Sends requests either the current best server or all servers. diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index e0775768..6d896abb 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -11,7 +11,6 @@ use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; -use futures::stream::FuturesUnordered; use futures::StreamExt; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; @@ -786,7 +785,7 @@ impl Web3Rpc { let head_block = rpc.head_block.read().clone(); if let Some((block_number, txid)) = head_block.and_then(|x| { - let block = x.block.clone(); + let block = x.block; let block_number = block.number?; let txid = block.transactions.last().cloned()?; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 1fa267b1..bb562009 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -15,7 +15,6 @@ use axum::{ Json, TypedHeader, }; use chrono::{DateTime, FixedOffset}; -use entities::{rpc_accounting, rpc_key}; use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::models::Query; @@ -140,11 +139,8 @@ pub async fn query_user_stats<'a>( let mut filter_chain_id = "".to_string(); // Add to group columns the method, if we want the detailed view as well - match stat_response_type { - StatType::Detailed => { - group_columns.push("method"); - } - _ => {} + if let StatType::Detailed = stat_response_type { + group_columns.push("method"); } if chain_id == 0 { @@ -249,7 +245,7 @@ pub async fn query_user_stats<'a>( influx_responses .into_iter() - .map(|x| (x._time.clone(), x)) + .map(|x| (x._time, x)) .into_group_map() .into_iter() .map(|(group, grouped_items)| { @@ -348,7 +344,7 @@ pub async fn query_user_stats<'a>( // Group by all fields together .. influx_responses .into_iter() - .map(|x| ((x._time.clone(), x.method.clone()), x)) + .map(|x| ((x._time, x.method.clone()), x)) .into_group_map() .into_iter() .map(|(group, grouped_items)| { @@ -464,7 +460,7 @@ pub async fn query_user_stats<'a>( if let Some(rpc_key_id) = params.get("rpc_key_id") { let rpc_key_id = rpc_key_id .parse::() - .map_err(|e| Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()))?; + .map_err(|_| Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()))?; response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 5ecae74d..6b960219 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -231,7 +231,7 @@ impl BufferedRpcQueryStats { db_conn: &DatabaseConnection, key: RpcQueryKey, ) -> anyhow::Result<()> { - let period_datetime = Utc.timestamp_opt(key.response_timestamp as i64, 0).unwrap(); + let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); // this is a lot of variables let accounting_entry = rpc_accounting_v2::ActiveModel { @@ -406,6 +406,7 @@ impl RpcQueryStats { } impl StatBuffer { + #[allow(clippy::too_many_arguments)] pub fn try_spawn( chain_id: u64, bucket: String,