Anon rpc accounting (#194)

* cargo generate migrations didnt work without this

* anon rpc accounting

* Revert "anon rpc accounting"

This reverts commit f1b8cbcdeca0d5f9545da32f11dda6a90854f97a.

* 0 for anon in mysql

* only set approximate_balance_remaining on owned stats

* only set self.approximate_balance_remaining when adding if some

* todo complete
This commit is contained in:
Bryan Stitt 2023-07-24 22:02:33 -07:00 committed by GitHub
parent 9b361cb849
commit 05e618f74c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 46 deletions

1
Cargo.lock generated

@ -3333,6 +3333,7 @@ name = "migration"
version = "0.36.0"
dependencies = [
"chrono",
"sea-orm",
"sea-orm-migration",
"tokio",
]

@ -11,6 +11,7 @@ path = "src/lib.rs"
[dependencies]
tokio = { version = "1.29.1", features = ["full", "tracing"] }
chrono = "0.4.26"
sea-orm = { version = "0.11.3", features = ["with-chrono"]}
[dependencies.sea-orm-migration]
version = "0.11.3"

@ -133,8 +133,6 @@ pub enum AuthorizationType {
#[derive(Clone, Debug, Default, From)]
pub struct AuthorizationChecks {
/// database id of the primary user. 0 if anon
/// TODO: do we need this? its on the authorization so probably not
/// TODO: `Option<NonZeroU64>`? they are actual zeroes some places in the db now
pub user_id: u64,
/// locally cached balance that may drift slightly if the user is on multiple servers
pub latest_balance: Arc<AsyncRwLock<Balance>>,

@ -26,7 +26,6 @@ use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::mem;
use std::num::NonZeroU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tracing::{error, instrument, trace, warn};
@ -88,15 +87,15 @@ pub struct RpcQueryKey {
user_error_response: bool,
/// the rpc method used.
method: Cow<'static, str>,
/// None if the public url was used.
rpc_secret_key_id: Option<NonZeroU64>,
/// None if the public url was used.
rpc_key_user_id: Option<NonZeroU64>,
/// 0 if the public url was used.
rpc_secret_key_id: u64,
/// 0 if the public url was used.
rpc_key_user_id: u64,
}
impl RpcQueryKey {
pub fn is_registered(&self) -> bool {
self.rpc_key_user_id.is_some()
self.rpc_key_user_id != 0
}
}
@ -113,8 +112,15 @@ impl RpcQueryStats {
fn accounting_key(&self, period_seconds: i64) -> RpcQueryKey {
let response_timestamp = round_timestamp(self.response_timestamp, period_seconds);
// TODO: change this to use 0 for anonymous queries
let rpc_secret_key_id = self.authorization.checks.rpc_secret_key_id;
// it is very important that for anonymous users, rpc_secret_key_id is 0 and not NULL in the database
// for unique indexes, sql sees each NULL as a unique value!
// but we want them grouped!
let rpc_secret_key_id = self
.authorization
.checks
.rpc_secret_key_id
.map(u64::from)
.unwrap_or_default();
let method = self.method.clone();
@ -129,7 +135,7 @@ impl RpcQueryStats {
error_response: self.error_response,
method,
rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
rpc_key_user_id: self.authorization.checks.user_id,
user_error_response,
}
}
@ -141,8 +147,8 @@ impl RpcQueryStats {
let method = self.method.clone();
// everyone gets grouped together
let rpc_secret_key_id = None;
let rpc_key_user_id = None;
let rpc_secret_key_id = 0;
let rpc_key_user_id = 0;
RpcQueryKey {
response_timestamp: self.response_timestamp,
@ -161,6 +167,13 @@ impl RpcQueryStats {
return None;
}
let rpc_secret_key_id = self
.authorization
.checks
.rpc_secret_key_id
.map(u64::from)
.unwrap_or_default();
let method = self.method.clone();
let key = RpcQueryKey {
@ -168,8 +181,8 @@ impl RpcQueryStats {
archive_needed: self.archive_request,
error_response: self.error_response,
method,
rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id,
user_error_response: self.user_error_response,
};
@ -187,7 +200,7 @@ pub enum AppStat {
// TODO: move to stat_buffer.rs?
impl BufferedRpcQueryStats {
#[instrument(level = "trace")]
async fn add(&mut self, stat: RpcQueryStats, approximate_balance_remaining: Decimal) {
async fn add(&mut self, stat: RpcQueryStats, approximate_balance_remaining: Option<Decimal>) {
// a stat always come from just 1 frontend request
self.frontend_requests += 1;
@ -214,7 +227,10 @@ impl BufferedRpcQueryStats {
self.paid_credits_used += stat.compute_unit_cost;
}
self.approximate_balance_remaining = approximate_balance_remaining;
if approximate_balance_remaining.is_some() {
// notice that we overwrite. we intentionally do not increment!
self.approximate_balance_remaining = approximate_balance_remaining;
}
trace!("added");
}
@ -232,7 +248,8 @@ impl BufferedRpcQueryStats {
// =============================== //
let accounting_entry = rpc_accounting_v2::ActiveModel {
id: sea_orm::NotSet,
rpc_key_id: sea_orm::Set(key.rpc_secret_key_id.map(Into::into)),
// eventually rpc_key_id will be `NOT NULL`, but we have old data in the db to deal with
rpc_key_id: sea_orm::Set(Some(key.rpc_secret_key_id)),
chain_id: sea_orm::Set(chain_id),
period_datetime: sea_orm::Set(period_datetime),
archive_needed: sea_orm::Set(key.archive_needed),
@ -335,7 +352,7 @@ impl BufferedRpcQueryStats {
}
// TODO: rename to owner_id?
let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get());
let sender_user_id = key.rpc_key_user_id;
// save the statistics to the database:
self._save_db_stats(chain_id, db_conn, &key).await?;
@ -479,12 +496,6 @@ impl BufferedRpcQueryStats {
.field("sum_request_bytes", self.sum_request_bytes as i64)
.field("sum_response_bytes", self.sum_response_bytes as i64)
.field("sum_response_millis", self.sum_response_millis as i64)
.field(
"balance",
self.approximate_balance_remaining
.to_f64()
.context("balance is really (too) large")?,
)
.field(
"sum_credits_used",
self.paid_credits_used
@ -498,9 +509,16 @@ impl BufferedRpcQueryStats {
.context("sum_credits_used is really (too) large")?,
);
if let Some(balance) = self.approximate_balance_remaining {
builder = builder.field(
"balance",
balance.to_f64().context("balance is really (too) large")?,
)
}
// TODO: set the rpc_secret_key_id tag to 0 when anon? will that make other queries easier?
if let Some(rpc_secret_key_id) = key.rpc_secret_key_id {
builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string());
if key.rpc_secret_key_id != 0 {
builder = builder.tag("rpc_secret_key_id", key.rpc_secret_key_id.to_string());
}
// [add "uniq" to the timestamp](https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp)

@ -31,7 +31,7 @@ pub struct BufferedRpcQueryStats {
/// The user's balance at this point in time.
/// Multiple queries might be modifying it at once, so this is a copy of it when received
/// None if this is an unauthenticated request
pub approximate_balance_remaining: Decimal,
pub approximate_balance_remaining: Option<Decimal>,
}
#[derive(From)]
@ -297,19 +297,17 @@ impl StatBuffer {
}
let accounting_key = stat.accounting_key(self.billing_period_seconds);
if accounting_key.is_registered() {
let span = tracing::trace_span!(
"accounting",
key = tracing::field::debug(&accounting_key)
)
.or_current();
self.accounting_db_buffer
.entry(accounting_key)
.or_default()
.add(stat.clone(), approximate_balance_remaining)
.instrument(span)
.await;
}
let span =
tracing::trace_span!("accounting", key = tracing::field::debug(&accounting_key))
.or_current();
self.accounting_db_buffer
.entry(accounting_key)
.or_default()
.add(stat.clone(), None)
.instrument(span)
.await;
}
if self.influxdb_client.is_some() {
@ -322,7 +320,7 @@ impl StatBuffer {
self.opt_in_timeseries_buffer
.entry(opt_in_timeseries_key)
.or_default()
.add(stat.clone(), approximate_balance_remaining)
.add(stat.clone(), Some(approximate_balance_remaining))
.instrument(span)
.await;
}
@ -336,7 +334,7 @@ impl StatBuffer {
self.global_timeseries_buffer
.entry(global_timeseries_key)
.or_default()
.add(stat, approximate_balance_remaining)
.add(stat, None)
.instrument(span)
.await;
}

@ -92,11 +92,11 @@ async fn test_sum_credits_used() {
// flush stats
let flushed = x.flush_stats().await.unwrap();
// TODO: this was 2 when we flushed stats for anon users. that was temporarily disabled. it should be turned back on once indexes are fixed
assert_eq!(flushed.relational, 1, "relational");
// TODO: how many should this actually be?
assert_eq!(flushed.relational, 2, "relational");
assert_eq!(flushed.timeseries, 1, "timeseries");
// TODO: sleep and then flush and make sure no more arrive
// Give user wallet $1000
admin_increase_balance(&x, &r, &admin_login_response, &user_wallet, 1000.into()).await;