diff --git a/Cargo.lock b/Cargo.lock index ecfb63ea..58ac7a45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3333,6 +3333,7 @@ name = "migration" version = "0.36.0" dependencies = [ "chrono", + "sea-orm", "sea-orm-migration", "tokio", ] diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 00f780a8..14a480c0 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" [dependencies] tokio = { version = "1.29.1", features = ["full", "tracing"] } chrono = "0.4.26" +sea-orm = { version = "0.11.3", features = ["with-chrono"]} [dependencies.sea-orm-migration] version = "0.11.3" diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index da93ff50..3b6c123b 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -133,8 +133,6 @@ pub enum AuthorizationType { #[derive(Clone, Debug, Default, From)] pub struct AuthorizationChecks { /// database id of the primary user. 0 if anon - /// TODO: do we need this? its on the authorization so probably not - /// TODO: `Option`? they are actual zeroes some places in the db now pub user_id: u64, /// locally cached balance that may drift slightly if the user is on multiple servers pub latest_balance: Arc>, diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 4bdc7ee2..2e35ea1f 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -26,7 +26,6 @@ use num_traits::ToPrimitive; use parking_lot::Mutex; use std::borrow::Cow; use std::mem; -use std::num::NonZeroU64; use std::sync::atomic::Ordering; use std::sync::Arc; use tracing::{error, instrument, trace, warn}; @@ -88,15 +87,15 @@ pub struct RpcQueryKey { user_error_response: bool, /// the rpc method used. method: Cow<'static, str>, - /// None if the public url was used. - rpc_secret_key_id: Option, - /// None if the public url was used. - rpc_key_user_id: Option, + /// 0 if the public url was used. + rpc_secret_key_id: u64, + /// 0 if the public url was used. + rpc_key_user_id: u64, } impl RpcQueryKey { pub fn is_registered(&self) -> bool { - self.rpc_key_user_id.is_some() + self.rpc_key_user_id != 0 } } @@ -113,8 +112,15 @@ impl RpcQueryStats { fn accounting_key(&self, period_seconds: i64) -> RpcQueryKey { let response_timestamp = round_timestamp(self.response_timestamp, period_seconds); - // TODO: change this to use 0 for anonymous queries - let rpc_secret_key_id = self.authorization.checks.rpc_secret_key_id; + // it is very important that for anonymous users, rpc_secret_key_id is 0 and not NULL in the database + // for unique indexes, sql sees each NULL as a unique value! + // but we want them grouped! + let rpc_secret_key_id = self + .authorization + .checks + .rpc_secret_key_id + .map(u64::from) + .unwrap_or_default(); let method = self.method.clone(); @@ -129,7 +135,7 @@ impl RpcQueryStats { error_response: self.error_response, method, rpc_secret_key_id, - rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), + rpc_key_user_id: self.authorization.checks.user_id, user_error_response, } } @@ -141,8 +147,8 @@ impl RpcQueryStats { let method = self.method.clone(); // everyone gets grouped together - let rpc_secret_key_id = None; - let rpc_key_user_id = None; + let rpc_secret_key_id = 0; + let rpc_key_user_id = 0; RpcQueryKey { response_timestamp: self.response_timestamp, @@ -161,6 +167,13 @@ impl RpcQueryStats { return None; } + let rpc_secret_key_id = self + .authorization + .checks + .rpc_secret_key_id + .map(u64::from) + .unwrap_or_default(); + let method = self.method.clone(); let key = RpcQueryKey { @@ -168,8 +181,8 @@ impl RpcQueryStats { archive_needed: self.archive_request, error_response: self.error_response, method, - rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id, - rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), + rpc_secret_key_id, + rpc_key_user_id: self.authorization.checks.user_id, user_error_response: self.user_error_response, }; @@ -187,7 +200,7 @@ pub enum AppStat { // TODO: move to stat_buffer.rs? impl BufferedRpcQueryStats { #[instrument(level = "trace")] - async fn add(&mut self, stat: RpcQueryStats, approximate_balance_remaining: Decimal) { + async fn add(&mut self, stat: RpcQueryStats, approximate_balance_remaining: Option) { // a stat always come from just 1 frontend request self.frontend_requests += 1; @@ -214,7 +227,10 @@ impl BufferedRpcQueryStats { self.paid_credits_used += stat.compute_unit_cost; } - self.approximate_balance_remaining = approximate_balance_remaining; + if approximate_balance_remaining.is_some() { + // notice that we overwrite. we intentionally do not increment! + self.approximate_balance_remaining = approximate_balance_remaining; + } trace!("added"); } @@ -232,7 +248,8 @@ impl BufferedRpcQueryStats { // =============================== // let accounting_entry = rpc_accounting_v2::ActiveModel { id: sea_orm::NotSet, - rpc_key_id: sea_orm::Set(key.rpc_secret_key_id.map(Into::into)), + // eventually rpc_key_id will be `NOT NULL`, but we have old data in the db to deal with + rpc_key_id: sea_orm::Set(Some(key.rpc_secret_key_id)), chain_id: sea_orm::Set(chain_id), period_datetime: sea_orm::Set(period_datetime), archive_needed: sea_orm::Set(key.archive_needed), @@ -335,7 +352,7 @@ impl BufferedRpcQueryStats { } // TODO: rename to owner_id? - let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get()); + let sender_user_id = key.rpc_key_user_id; // save the statistics to the database: self._save_db_stats(chain_id, db_conn, &key).await?; @@ -479,12 +496,6 @@ impl BufferedRpcQueryStats { .field("sum_request_bytes", self.sum_request_bytes as i64) .field("sum_response_bytes", self.sum_response_bytes as i64) .field("sum_response_millis", self.sum_response_millis as i64) - .field( - "balance", - self.approximate_balance_remaining - .to_f64() - .context("balance is really (too) large")?, - ) .field( "sum_credits_used", self.paid_credits_used @@ -498,9 +509,16 @@ impl BufferedRpcQueryStats { .context("sum_credits_used is really (too) large")?, ); + if let Some(balance) = self.approximate_balance_remaining { + builder = builder.field( + "balance", + balance.to_f64().context("balance is really (too) large")?, + ) + } + // TODO: set the rpc_secret_key_id tag to 0 when anon? will that make other queries easier? - if let Some(rpc_secret_key_id) = key.rpc_secret_key_id { - builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string()); + if key.rpc_secret_key_id != 0 { + builder = builder.tag("rpc_secret_key_id", key.rpc_secret_key_id.to_string()); } // [add "uniq" to the timestamp](https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 75a564b0..18b360c2 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -31,7 +31,7 @@ pub struct BufferedRpcQueryStats { /// The user's balance at this point in time. /// Multiple queries might be modifying it at once, so this is a copy of it when received /// None if this is an unauthenticated request - pub approximate_balance_remaining: Decimal, + pub approximate_balance_remaining: Option, } #[derive(From)] @@ -297,19 +297,17 @@ impl StatBuffer { } let accounting_key = stat.accounting_key(self.billing_period_seconds); - if accounting_key.is_registered() { - let span = tracing::trace_span!( - "accounting", - key = tracing::field::debug(&accounting_key) - ) - .or_current(); - self.accounting_db_buffer - .entry(accounting_key) - .or_default() - .add(stat.clone(), approximate_balance_remaining) - .instrument(span) - .await; - } + + let span = + tracing::trace_span!("accounting", key = tracing::field::debug(&accounting_key)) + .or_current(); + + self.accounting_db_buffer + .entry(accounting_key) + .or_default() + .add(stat.clone(), None) + .instrument(span) + .await; } if self.influxdb_client.is_some() { @@ -322,7 +320,7 @@ impl StatBuffer { self.opt_in_timeseries_buffer .entry(opt_in_timeseries_key) .or_default() - .add(stat.clone(), approximate_balance_remaining) + .add(stat.clone(), Some(approximate_balance_remaining)) .instrument(span) .await; } @@ -336,7 +334,7 @@ impl StatBuffer { self.global_timeseries_buffer .entry(global_timeseries_key) .or_default() - .add(stat, approximate_balance_remaining) + .add(stat, None) .instrument(span) .await; } diff --git a/web3_proxy/tests/test_sum_credits_used.rs b/web3_proxy/tests/test_sum_credits_used.rs index 1401a045..30508ee1 100644 --- a/web3_proxy/tests/test_sum_credits_used.rs +++ b/web3_proxy/tests/test_sum_credits_used.rs @@ -92,11 +92,11 @@ async fn test_sum_credits_used() { // flush stats let flushed = x.flush_stats().await.unwrap(); - // TODO: this was 2 when we flushed stats for anon users. that was temporarily disabled. it should be turned back on once indexes are fixed - assert_eq!(flushed.relational, 1, "relational"); - // TODO: how many should this actually be? + assert_eq!(flushed.relational, 2, "relational"); assert_eq!(flushed.timeseries, 1, "timeseries"); + // TODO: sleep and then flush and make sure no more arrive + // Give user wallet $1000 admin_increase_balance(&x, &r, &admin_login_response, &user_wallet, 1000.into()).await;