diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index d262b3ec..ac5d75f6 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -22,7 +22,6 @@ use reqwest::header::ToStrError; use rust_decimal::Error as DecimalError; use serde::Serialize; use serde_json::value::RawValue; -use std::error::Error; use std::sync::Arc; use std::{borrow::Cow, net::IpAddr}; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index d201d248..99c70625 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -24,7 +24,8 @@ use http::HeaderValue; use ipnet::IpNet; use log::{error, trace, warn}; use migration::sea_orm::prelude::Decimal; -use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use migration::sea_orm::{self, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use migration::OnConflict; use parking_lot::RwLock; use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage}; use rdkafka::producer::{FutureProducer, FutureRecord}; @@ -72,7 +73,9 @@ pub enum AuthorizationType { #[derive(Clone, Debug, Default)] pub struct Balance { + /// The total USD value deposited. pub total_deposit: Decimal, + /// The amount spent outside the free tier. pub total_spend: Decimal, } @@ -1187,21 +1190,41 @@ impl Web3ProxyApp { .try_get_with(x, async move { let db_replica = self .db_replica() - .web3_context("Getting database connection")?; + .web3_context("Getting database replica connection")?; - let balance = match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_id)) - .one(db_replica.as_ref()) - .await? - { - Some(x) => Balance { - total_deposit: x.total_deposits, - total_spend: x.total_spent_outside_free_tier, - }, - None => Default::default(), - }; + loop { + match balance::Entity::find() + .filter(balance::Column::UserId.eq(user_id)) + .one(db_replica.as_ref()) + .await? + { + Some(x) => { + let x = Balance { + total_deposit: x.total_deposits, + total_spend: x.total_spent_outside_free_tier, + }; - Ok(Arc::new(RwLock::new(balance))) + return Ok(Arc::new(RwLock::new(x))); + } + None => { + // no balance row. make one now + let db_conn = + self.db_conn().web3_context("Getting database connection")?; + + let balance_entry = balance::ActiveModel { + user_id: sea_orm::Set(user_id), + ..Default::default() + }; + + balance::Entity::insert(balance_entry) + .on_conflict(OnConflict::new().to_owned()) + .exec(&db_conn) + .await?; + + continue; + } + }; + } }) .await .map_err(Into::into),