diff --git a/Cargo.lock b/Cargo.lock index 76ea43c2..fba965db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1678,7 +1678,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.32.0" +version = "0.33.0" dependencies = [ "ethers", "sea-orm", @@ -3354,7 +3354,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.32.0" +version = "0.33.0" dependencies = [ "sea-orm-migration", "tokio", diff --git a/README.md b/README.md index e51c050e..d2c940cd 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,7 @@ cargo install sea-orm-cli 3. `sea-orm-cli generate entity -u mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy -o entities/src --with-serde both` - Be careful when adding the `--tables THE,MODIFIED,TABLES` flag. It will delete relationships if they aren't listed 4. After running the above, you will need to manually fix some things + - Add any derives that got removed (like `Default`) - `Vec` -> `sea_orm::prelude::Uuid` (Related: ) - `i8` -> `bool` (Related: ) - add all the tables back into `mod.rs` diff --git a/entities/Cargo.toml b/entities/Cargo.toml index eb218433..c3c20319 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.32.0" +version = "0.33.0" edition = "2021" [lib] diff --git a/entities/src/admin.rs b/entities/src/admin.rs index adf88d0a..3b8dc7df 100644 --- a/entities/src/admin.rs +++ b/entities/src/admin.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/admin_increase_balance_receipt.rs b/entities/src/admin_increase_balance_receipt.rs index 76dbda95..999c9076 100644 --- a/entities/src/admin_increase_balance_receipt.rs +++ b/entities/src/admin_increase_balance_receipt.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/admin_trail.rs b/entities/src/admin_trail.rs index fe7348d5..185f75d7 100644 --- a/entities/src/admin_trail.rs +++ b/entities/src/admin_trail.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -10,7 +10,9 @@ pub struct Model { pub id: i32, pub caller: u64, pub imitating_user: Option, + #[sea_orm(column_type = "Text")] pub endpoint: String, + #[sea_orm(column_type = "Text")] pub payload: String, pub timestamp: DateTimeUtc, } diff --git a/entities/src/balance.rs b/entities/src/balance.rs index a888792e..e0d7a39c 100644 --- a/entities/src/balance.rs +++ b/entities/src/balance.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/increase_on_chain_balance_receipt.rs b/entities/src/increase_on_chain_balance_receipt.rs index d547fc69..29e76d05 100644 --- a/entities/src/increase_on_chain_balance_receipt.rs +++ b/entities/src/increase_on_chain_balance_receipt.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/login.rs b/entities/src/login.rs index 92c2df5e..83d04cf6 100644 --- a/entities/src/login.rs +++ b/entities/src/login.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use crate::serialization; use sea_orm::entity::prelude::*; @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: u64, - #[sea_orm(unique)] + #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)] #[serde(serialize_with = "serialization::uuid_as_ulid")] pub bearer_token: Uuid, pub user_id: u64, diff --git a/entities/src/mod.rs b/entities/src/mod.rs index b67a973e..4f8c1289 100644 --- a/entities/src/mod.rs +++ b/entities/src/mod.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 pub mod prelude; diff --git a/entities/src/pending_login.rs b/entities/src/pending_login.rs index 6c701fca..6e99d842 100644 --- a/entities/src/pending_login.rs +++ b/entities/src/pending_login.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use crate::serialization; use sea_orm::entity::prelude::*; @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: u64, - #[sea_orm(unique)] + #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)] #[serde(serialize_with = "serialization::uuid_as_ulid")] pub nonce: Uuid, #[sea_orm(column_type = "Text")] diff --git a/entities/src/prelude.rs b/entities/src/prelude.rs index 20b764f3..821e997f 100644 --- a/entities/src/prelude.rs +++ b/entities/src/prelude.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 pub use super::admin::Entity as Admin; pub use super::admin_increase_balance_receipt::Entity as AdminIncreaseBalanceReceipt; @@ -14,5 +14,6 @@ pub use super::rpc_accounting::Entity as RpcAccounting; pub use super::rpc_accounting_v2::Entity as RpcAccountingV2; pub use super::rpc_key::Entity as RpcKey; pub use super::secondary_user::Entity as SecondaryUser; +pub use super::stripe_increase_balance_receipt::Entity as StripeIncreaseBalanceReceipt; pub use super::user::Entity as User; pub use super::user_tier::Entity as UserTier; diff --git a/entities/src/referee.rs b/entities/src/referee.rs index cf8ee3c3..018527e7 100644 --- a/entities/src/referee.rs +++ b/entities/src/referee.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/referrer.rs b/entities/src/referrer.rs index 7cbd3b98..8b959ac5 100644 --- a/entities/src/referrer.rs +++ b/entities/src/referrer.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/revert_log.rs b/entities/src/revert_log.rs index 20cfc4e9..8f96ed9f 100644 --- a/entities/src/revert_log.rs +++ b/entities/src/revert_log.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use super::sea_orm_active_enums::Method; use crate::serialization; @@ -13,6 +13,7 @@ pub struct Model { pub rpc_key_id: u64, pub timestamp: DateTimeUtc, pub method: Method, + #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))")] #[serde(serialize_with = "serialization::vec_as_address")] pub to: Vec, #[sea_orm(column_type = "Text", nullable)] diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index fac4d09f..7051edaa 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -19,6 +19,7 @@ pub struct Model { pub cache_hits: u64, pub sum_request_bytes: u64, pub min_request_bytes: u64, + #[sea_orm(column_type = "Double")] pub mean_request_bytes: f64, pub p50_request_bytes: u64, pub p90_request_bytes: u64, @@ -26,6 +27,7 @@ pub struct Model { pub max_request_bytes: u64, pub sum_response_millis: u64, pub min_response_millis: u64, + #[sea_orm(column_type = "Double")] pub mean_response_millis: f64, pub p50_response_millis: u64, pub p90_response_millis: u64, @@ -33,6 +35,7 @@ pub struct Model { pub max_response_millis: u64, pub sum_response_bytes: u64, pub min_response_bytes: u64, + #[sea_orm(column_type = "Double")] pub mean_response_bytes: f64, pub p50_response_bytes: u64, pub p90_response_bytes: u64, diff --git a/entities/src/rpc_accounting_v2.rs b/entities/src/rpc_accounting_v2.rs index d3cc8cb6..324154c4 100644 --- a/entities/src/rpc_accounting_v2.rs +++ b/entities/src/rpc_accounting_v2.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/rpc_key.rs b/entities/src/rpc_key.rs index 8d35bf48..6a202b52 100644 --- a/entities/src/rpc_key.rs +++ b/entities/src/rpc_key.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use crate::serialization; use sea_orm::entity::prelude::*; @@ -10,7 +10,7 @@ pub struct Model { #[sea_orm(primary_key)] pub id: u64, pub user_id: u64, - #[sea_orm(unique)] + #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)] #[serde(serialize_with = "serialization::uuid_as_ulid")] pub secret_key: Uuid, pub description: Option, @@ -24,6 +24,7 @@ pub struct Model { pub allowed_referers: Option, #[sea_orm(column_type = "Text", nullable)] pub allowed_user_agents: Option, + #[sea_orm(column_type = "Double")] pub log_revert_chance: f64, } diff --git a/entities/src/sea_orm_active_enums.rs b/entities/src/sea_orm_active_enums.rs index 00fb46aa..99c5565b 100644 --- a/entities/src/sea_orm_active_enums.rs +++ b/entities/src/sea_orm_active_enums.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/entities/src/secondary_user.rs b/entities/src/secondary_user.rs index 15b44f70..4efa6f46 100644 --- a/entities/src/secondary_user.rs +++ b/entities/src/secondary_user.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use super::sea_orm_active_enums::Role; use sea_orm::entity::prelude::*; diff --git a/entities/src/stripe_increase_balance_receipt.rs b/entities/src/stripe_increase_balance_receipt.rs index b287353a..71eb2608 100644 --- a/entities/src/stripe_increase_balance_receipt.rs +++ b/entities/src/stripe_increase_balance_receipt.rs @@ -1,15 +1,15 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; -use serde::Serialize; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "stripe_increase_balance_receipt")] pub struct Model { #[sea_orm(primary_key)] pub id: u64, - pub deposit_to_user_id: Option, pub stripe_payment_intend_id: String, + pub deposit_to_user_id: Option, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] pub amount: Decimal, pub currency: String, diff --git a/entities/src/user.rs b/entities/src/user.rs index 71ca1101..da9432c9 100644 --- a/entities/src/user.rs +++ b/entities/src/user.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use crate::serialization; use sea_orm::entity::prelude::*; @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: u64, - #[sea_orm(unique)] + #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))", unique)] #[serde(serialize_with = "serialization::vec_as_address")] pub address: Vec, pub description: Option, @@ -19,14 +19,28 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { - #[sea_orm(has_many = "super::login::Entity")] - Login, - #[sea_orm(has_many = "super::rpc_key::Entity")] - RpcKey, + #[sea_orm(has_one = "super::admin::Entity")] + Admin, + #[sea_orm(has_many = "super::admin_increase_balance_receipt::Entity")] + AdminIncreaseBalanceReceipt, + #[sea_orm(has_one = "super::balance::Entity")] + Balance, #[sea_orm(has_many = "super::increase_on_chain_balance_receipt::Entity")] IncreaseOnChainBalanceReceipt, + #[sea_orm(has_many = "super::login::Entity")] + Login, + #[sea_orm(has_many = "super::pending_login::Entity")] + PendingLogin, + #[sea_orm(has_one = "super::referee::Entity")] + Referee, + #[sea_orm(has_one = "super::referrer::Entity")] + Referrer, + #[sea_orm(has_many = "super::rpc_key::Entity")] + RpcKey, #[sea_orm(has_many = "super::secondary_user::Entity")] SecondaryUser, + #[sea_orm(has_many = "super::stripe_increase_balance_receipt::Entity")] + StripeIncreaseBalanceReceipt, #[sea_orm( belongs_to = "super::user_tier::Entity", from = "Column::UserTierId", @@ -37,15 +51,21 @@ pub enum Relation { UserTier, } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::Login.def() + Relation::Admin.def() } } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::RpcKey.def() + Relation::AdminIncreaseBalanceReceipt.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Balance.def() } } @@ -55,12 +75,48 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Login.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::PendingLogin.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Referee.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Referrer.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::RpcKey.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::SecondaryUser.def() } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::StripeIncreaseBalanceReceipt.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::UserTier.def() diff --git a/entities/src/user_tier.rs b/entities/src/user_tier.rs index d0409f23..4117b93c 100644 --- a/entities/src/user_tier.rs +++ b/entities/src/user_tier.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/migration/Cargo.toml b/migration/Cargo.toml index ed1e6145..e6b10217 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.32.0" +version = "0.33.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 0419e1d2..57c09c97 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -33,6 +33,7 @@ mod m20230615_221201_handle_payment_uncles; mod m20230618_230611_longer_payload; mod m20230619_172237_default_tracking; mod m20230622_104142_stripe_deposits; +mod m20230705_214013_type_fixes; pub struct Migrator; @@ -73,6 +74,7 @@ impl MigratorTrait for Migrator { Box::new(m20230618_230611_longer_payload::Migration), Box::new(m20230619_172237_default_tracking::Migration), Box::new(m20230622_104142_stripe_deposits::Migration), + Box::new(m20230705_214013_type_fixes::Migration), ] } } diff --git a/migration/src/m20230705_214013_type_fixes.rs b/migration/src/m20230705_214013_type_fixes.rs new file mode 100644 index 00000000..36d4d1f0 --- /dev/null +++ b/migration/src/m20230705_214013_type_fixes.rs @@ -0,0 +1,46 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(IncreaseOnChainBalanceReceipt::Table) + .modify_column( + ColumnDef::new(IncreaseOnChainBalanceReceipt::LogIndex) + .big_unsigned() + .not_null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(IncreaseOnChainBalanceReceipt::Table) + .modify_column( + ColumnDef::new(IncreaseOnChainBalanceReceipt::LogIndex) + .big_integer() + .unsigned() + .not_null(), + ) + .to_owned(), + ) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum IncreaseOnChainBalanceReceipt { + Table, + LogIndex, +} diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index c528c443..113c40d5 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -27,7 +27,6 @@ use anyhow::Context; use axum::http::StatusCode; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; -use derive_more::From; use entities::user; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; @@ -165,7 +164,6 @@ pub async fn flatten_handles( } /// starting an app creates many tasks -#[derive(From)] pub struct Web3ProxyAppSpawn { /// the app. probably clone this to use in other groups of handles pub app: Arc, @@ -187,6 +185,7 @@ impl Web3ProxyApp { top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, + flush_stat_buffer_receiver: broadcast::Receiver<()> ) -> anyhow::Result { let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); @@ -399,6 +398,7 @@ impl Web3ProxyApp { Some(user_balance_cache.clone()), stat_buffer_shutdown_receiver, 1, + flush_stat_buffer_receiver, )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting important_background_handles.push(spawned_stat_buffer.background_handle); @@ -653,14 +653,13 @@ impl Web3ProxyApp { important_background_handles.push(f); } - Ok(( + Ok(Web3ProxyAppSpawn { app, app_handles, - important_background_handles, - new_top_config_sender, - consensus_connections_watcher, - ) - .into()) + background_handles: important_background_handles, + new_top_config: new_top_config_sender, + ranked_rpcs: consensus_connections_watcher, + }) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> { diff --git a/web3_proxy/src/compute_units.rs b/web3_proxy/src/compute_units.rs index 61752291..a82a8ee4 100644 --- a/web3_proxy/src/compute_units.rs +++ b/web3_proxy/src/compute_units.rs @@ -135,8 +135,16 @@ impl ComputeUnit { /// Compute cost per request /// All methods cost the same /// The number of bytes are based on input, and output bytes - pub fn cost(&self, archive_request: bool, cache_hit: bool, usd_per_cu: Decimal) -> Decimal { - // TODO: server errors are free. need to split server and user errors + pub fn cost( + &self, + archive_request: bool, + cache_hit: bool, + error_response: bool, + usd_per_cu: Decimal, + ) -> Decimal { + if error_response { + return 0.into(); + } let mut cost = self.0 * usd_per_cu; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index c89874d2..80cf8f63 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -14,6 +14,7 @@ use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; use core::fmt; use deferred_rate_limiter::DeferredRateLimitResult; +use derivative::Derivative; use derive_more::From; use entities::{balance, login, rpc_key, user, user_tier}; use ethers::types::{Bytes, U64}; @@ -309,7 +310,8 @@ impl KafkaDebugLogger { } } -#[derive(Debug)] +#[derive(Debug, Derivative)] +#[derivative(Default)] pub struct RequestMetadata { /// TODO: set archive_request during the new instead of after /// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently @@ -329,7 +331,8 @@ pub struct RequestMetadata { /// Instant that the request was received (or at least close to it) /// We use Instant and not timestamps to avoid problems with leap seconds and similar issues - pub start_instant: tokio::time::Instant, + #[derivative(Default(value = "Instant::now()"))] + pub start_instant: Instant, /// if this is empty, there was a cache_hit /// otherwise, it is populated with any rpc servers that were used by this request pub backend_requests: BackendRequests, @@ -348,6 +351,8 @@ pub struct RequestMetadata { /// True if the response required querying a backup RPC /// RPC aggregators that query multiple providers to compare response may use this header to ignore our response. pub response_from_backup_rpc: AtomicBool, + /// If the request is invalid or received a jsonrpc error response (excluding reverts) + pub user_error_response: AtomicBool, /// ProxyMode::Debug logs requests and responses with Kafka /// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this @@ -363,30 +368,6 @@ impl Default for Authorization { } } -/// this is only implemented so that we can use `mem::take`. You probably shouldn't use this. -impl Default for RequestMetadata { - fn default() -> Self { - Self { - archive_request: Default::default(), - authorization: Default::default(), - backend_requests: Default::default(), - chain_id: Default::default(), - error_response: Default::default(), - kafka_debug_logger: Default::default(), - method: Default::default(), - no_servers: Default::default(), - request_bytes: Default::default(), - request_ulid: Default::default(), - response_bytes: Default::default(), - response_from_backup_rpc: Default::default(), - response_millis: Default::default(), - response_timestamp: Default::default(), - start_instant: Instant::now(), - stat_sender: Default::default(), - } - } -} - impl RequestMetadata { pub fn proxy_mode(&self) -> ProxyMode { self.authorization @@ -531,6 +512,7 @@ impl RequestMetadata { response_timestamp: 0.into(), start_instant: Instant::now(), stat_sender: app.stat_sender.clone(), + user_error_response: false.into(), }; Arc::new(x) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index d756338a..7f683611 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -852,9 +852,11 @@ impl Web3Rpcs { request_metadata .response_from_backup_rpc .store(is_backup_response, Ordering::Release); - } - if let Some(request_metadata) = request_metadata { + request_metadata + .user_error_response + .store(false, Ordering::Release); + request_metadata .error_response .store(false, Ordering::Release); @@ -863,21 +865,29 @@ impl Web3Rpcs { return Ok(response); } Err(error) => { - // trace!(?response, "rpc error"); - - // TODO: separate tracking for jsonrpc error and web3 proxy error! - if let Some(request_metadata) = request_metadata { - request_metadata - .error_response - .store(true, Ordering::Release); - } - // TODO: if this is an error, do NOT return. continue to try on another server let error = match JsonRpcErrorData::try_from(&error) { - Ok(x) => x, + Ok(x) => { + if let Some(request_metadata) = request_metadata { + request_metadata + .user_error_response + .store(true, Ordering::Release); + } + x + } Err(err) => { warn!(?err, "error from {}", rpc); + if let Some(request_metadata) = request_metadata { + request_metadata + .error_response + .store(true, Ordering::Release); + + request_metadata + .user_error_response + .store(false, Ordering::Release); + } + last_provider_error = Some(error); continue; @@ -1012,19 +1022,27 @@ impl Web3Rpcs { } } OpenRequestResult::NotReady => { + if let Some(request_metadata) = request_metadata { + request_metadata + .error_response + .store(true, Ordering::Release); + } break; } } } - // TODO: do we need this here, or do we do it somewhere else? like, the code could change and a try operator in here would skip this increment - if let Some(request_metadata) = request_metadata { - request_metadata - .error_response - .store(true, Ordering::Release); - } - if let Some(err) = method_not_available_response { + if let Some(request_metadata) = request_metadata { + request_metadata + .error_response + .store(false, Ordering::Release); + + request_metadata + .user_error_response + .store(true, Ordering::Release); + } + // this error response is likely the user's fault // TODO: emit a stat for unsupported methods. then we can know what there is demand for or if we are missing a feature return Err(err.into()); diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 5cec8d73..ee97486b 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -263,6 +263,7 @@ pub async fn query_user_stats<'a>( "error_response", "method", "rpc_secret_key_id", + "user_error_response", ]"# } }; @@ -332,7 +333,7 @@ pub async fn query_user_stats<'a>( // TODO: lower log level debug!("Raw query to db is: {:#}", query); let query = Query::new(query.to_string()); - trace!("Query to db is: {:#?}", query); + trace!(?query, "influx"); // Make the query and collect all data let raw_influx_responses: Vec = influxdb_client @@ -575,7 +576,25 @@ pub async fn query_user_stats<'a>( ); } _ => { - error!("error_response should always be a Long!"); + error!("error_response should always be a String!"); + } + } + } else if stat_response_type == StatType::Detailed && key == "user_error_response" { + match value { + influxdb2_structmap::value::Value::String(inner) => { + out.insert( + "user_error_response", + if inner == "true" { + serde_json::Value::Bool(true) + } else if inner == "false" { + serde_json::Value::Bool(false) + } else { + serde_json::Value::String("error".to_owned()) + }, + ); + } + _ => { + error!("user_error_response should always be a String!"); } } } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 35f85a2b..c39287cb 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -29,7 +29,7 @@ use std::borrow::Cow; use std::mem; use std::num::NonZeroU64; use std::str::FromStr; -use std::sync::atomic::{self, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; use tracing::trace; @@ -61,6 +61,8 @@ pub struct RpcQueryStats { /// The cost of the query in USD /// If the user is on a free tier, this is still calculated so we know how much we are giving away. pub compute_unit_cost: Decimal, + /// If the request is invalid or received a jsonrpc error response (excluding reverts) + pub user_error_response: bool, } #[derive(Clone, Debug, From, Hash, PartialEq, Eq)] @@ -71,11 +73,13 @@ pub struct RpcQueryKey { response_timestamp: i64, /// true if an archive server was needed to serve the request. archive_needed: bool, - /// true if the response was some sort of JSONRPC error. + /// true if the response was some sort of application error. error_response: bool, + /// true if the response was some sort of JSONRPC error. + user_error_response: bool, /// the rpc method used. method: Cow<'static, str>, - /// origin tracking was opt-in. Now it is "None" + /// origin tracking **was** opt-in. Now, it is always "None" origin: Option, /// None if the public url was used. rpc_secret_key_id: Option, @@ -103,6 +107,9 @@ impl RpcQueryStats { // we used to optionally store origin, but wallets don't set it, so its almost always None let origin = None; + // user_error_response is always set to false because we don't bother tracking this in the database + let user_error_response = false; + // Depending on method, add some arithmetic around calculating credits_used // I think balance should not go here, this looks more like a key thingy RpcQueryKey { @@ -113,6 +120,7 @@ impl RpcQueryStats { rpc_secret_key_id, rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), origin, + user_error_response, } } @@ -133,6 +141,7 @@ impl RpcQueryStats { method, rpc_secret_key_id, rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), + user_error_response: self.user_error_response, origin, } } @@ -151,6 +160,7 @@ impl RpcQueryStats { method, rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id, rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), + user_error_response: self.user_error_response, origin, }; @@ -732,6 +742,7 @@ impl BufferedRpcQueryStats { builder = builder .tag("archive_needed", key.archive_needed.to_string()) .tag("error_response", key.error_response.to_string()) + .tag("user_error_response", key.user_error_response.to_string()) .field("frontend_requests", self.frontend_requests as i64) .field("backend_requests", self.backend_requests as i64) .field("no_servers", self.no_servers as i64) @@ -784,9 +795,11 @@ impl TryFrom for RpcQueryStats { let response_bytes = metadata.response_bytes.load(Ordering::Acquire); let mut error_response = metadata.error_response.load(Ordering::Acquire); - let mut response_millis = metadata.response_millis.load(atomic::Ordering::Acquire); + let mut response_millis = metadata.response_millis.load(Ordering::Acquire); - let response_timestamp = match metadata.response_timestamp.load(atomic::Ordering::Acquire) { + let user_error_response = metadata.user_error_response.load(Ordering::Acquire); + + let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) { 0 => { // no response timestamp! if !error_response { @@ -820,7 +833,7 @@ impl TryFrom for RpcQueryStats { let cache_hit = !backend_rpcs_used.is_empty(); - let compute_unit_cost = cu.cost(archive_request, cache_hit, usd_per_cu); + let compute_unit_cost = cu.cost(archive_request, cache_hit, error_response, usd_per_cu); let method = mem::take(&mut metadata.method); @@ -836,6 +849,7 @@ impl TryFrom for RpcQueryStats { response_bytes, response_millis, response_timestamp, + user_error_response, }; Ok(x) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 2c099a0c..d454bb2e 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -64,6 +64,7 @@ impl StatBuffer { user_balance_cache: Option, shutdown_receiver: broadcast::Receiver<()>, tsdb_save_interval_seconds: u32, + flush_receiver: broadcast::Receiver<()>, ) -> anyhow::Result> { if db_conn.is_none() && influxdb_client.is_none() { return Ok(None); @@ -89,7 +90,7 @@ impl StatBuffer { // any errors inside this task will cause the application to exit let handle = tokio::spawn(async move { - new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver) + new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver, flush_receiver) .await }); @@ -101,6 +102,7 @@ impl StatBuffer { bucket: String, stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, + mut flush_receiver: broadcast::Receiver<()>, ) -> Web3ProxyResult<()> { let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); @@ -150,6 +152,19 @@ impl StatBuffer { trace!("Saved {} stats to the tsdb", count); } } + _ = flush_receiver.recv() => { + trace!("flush"); + + let count = self.save_tsdb_stats(&bucket).await; + if count > 0 { + trace!("Flushed {} stats to the tsdb", count); + } + + let count = self.save_relational_stats().await; + if count > 0 { + trace!("Flushed {} stats to the relational db", count); + } + } x = shutdown_receiver.recv() => { match x { Ok(_) => { diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 6c3a98d8..9e83e17c 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -72,6 +72,8 @@ impl MigrateStatsToV2SubCommand { None => None, }; + let (_flush_sender, flush_receiver) = broadcast::channel(1); + // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, @@ -88,6 +90,7 @@ impl MigrateStatsToV2SubCommand { None, rpc_account_shutdown_recevier, 1, + flush_receiver, ) .context("Error spawning stat buffer")? .context("No stat buffer spawned. Maybe missing influx or db credentials?")?; @@ -203,6 +206,7 @@ impl MigrateStatsToV2SubCommand { start_instant: Instant::now(), stat_sender: Some(stat_sender.clone()), request_ulid, + user_error_response: false.into(), }; if let Some(x) = request_metadata.try_send_stat()? { diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index 1325392d..45e5d9e6 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -42,6 +42,7 @@ impl ProxydSubCommand { let frontend_port = Arc::new(self.port.into()); let prometheus_port = Arc::new(self.prometheus_port.into()); + let (flush_stat_buffer_sender, _) = broadcast::channel(1); Self::_main( top_config, @@ -50,6 +51,7 @@ impl ProxydSubCommand { prometheus_port, num_workers, shutdown_sender, + flush_stat_buffer_sender, ) .await } @@ -62,14 +64,11 @@ impl ProxydSubCommand { prometheus_port: Arc, num_workers: usize, frontend_shutdown_sender: broadcast::Sender<()>, + flush_stat_buffer_sender: broadcast::Sender<()>, ) -> anyhow::Result<()> { - // tokio has code for catching ctrl+c so we use that - // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something + // tokio has code for catching ctrl+c so we use that to shut down in most cases + // frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` - - // TODO: should we use a watch or broadcast for these? - // Maybe this one ? - // let mut shutdown_receiver = shutdown_sender.subscribe(); let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe(); @@ -86,6 +85,7 @@ impl ProxydSubCommand { top_config.clone(), num_workers, app_shutdown_sender.clone(), + flush_stat_buffer_sender.subscribe(), ) .await?; diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index cf5a8ecd..4425e0f9 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -54,6 +54,9 @@ pub struct TestApp { /// connection to the proxy that is connected to anil. pub proxy_provider: Provider, + /// tell the app to flush stats to the database + flush_stat_buffer_sender: broadcast::Sender<()>, + /// tell the app to shut down (use `self.stop()`). shutdown_sender: broadcast::Sender<()>, } @@ -269,6 +272,8 @@ impl TestApp { let frontend_port_arc = Arc::new(AtomicU16::new(0)); let prometheus_port_arc = Arc::new(AtomicU16::new(0)); + let (flush_stat_buffer_sender, _flush_stat_buffer_receiver) = broadcast::channel(1); + // spawn the app // TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop let handle = { @@ -279,6 +284,7 @@ impl TestApp { prometheus_port_arc, num_workers, shutdown_sender.clone(), + flush_stat_buffer_sender.clone(), )) }; @@ -304,6 +310,7 @@ impl TestApp { db, proxy_handle: Mutex::new(Some(handle)), proxy_provider, + flush_stat_buffer_sender, shutdown_sender, } } @@ -313,6 +320,10 @@ impl TestApp { self.db.as_ref().unwrap().conn.as_ref().unwrap() } + pub fn flush_stats(&self) { + self.flush_stat_buffer_sender.send(()).unwrap(); + } + pub fn stop(&self) -> Result> { self.shutdown_sender.send(()) }