handle uncles in the background and keep transaction open for a shorter time

This commit is contained in:
Bryan Stitt 2023-07-13 10:13:50 -07:00
parent fa21b7c0a8
commit 28e6be5d9b

View File

@ -5,7 +5,7 @@ use crate::frontend::authorization::{
login_is_authorized, Authorization as Web3ProxyAuthorization,
};
use crate::frontend::users::authentication::register_new_user;
use crate::premium::grant_premium_tier;
use crate::premium::{get_user_and_tier_from_address, grant_premium_tier};
use anyhow::Context;
use axum::{
extract::Path,
@ -17,7 +17,7 @@ use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use entities::{
admin_increase_balance_receipt, increase_on_chain_balance_receipt,
stripe_increase_balance_receipt, user, user_tier,
stripe_increase_balance_receipt,
};
use ethers::abi::AbiEncode;
use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256};
@ -31,7 +31,7 @@ use payment_contracts::ierc20::IERC20;
use payment_contracts::payment_factory::{self, PaymentFactory};
use serde_json::json;
use std::sync::Arc;
use tracing::{debug, info, trace, warn};
use tracing::{debug, error, info, trace, warn};
/// Implements any logic related to payments
/// Removed this mainly from "user" as this was getting clogged
@ -250,10 +250,20 @@ pub async fn user_balance_post(
.map(|x| serde_json::from_str(x.block_hash.as_str()).unwrap())
.collect();
for uncle_hash in uncle_hashes.into_iter() {
if let Some(x) = handle_uncle_block(&app, &authorization, uncle_hash).await? {
info!("balance changes from uncle: {:#?}", x);
}
{
let app = app.clone();
tokio::spawn(async move {
for uncle_hash in uncle_hashes.into_iter() {
match handle_uncle_block(&app, &authorization, uncle_hash).await {
Ok(x) => {
info!(?x, "balance changes from uncle");
}
Err(err) => {
error!(?err, "handling uncle block");
}
}
}
});
}
if tx_pending {
@ -276,14 +286,12 @@ pub async fn user_balance_post(
// TODO: if the transaction doesn't have enough confirmations yet, add it to a queue to try again later
// 1 confirmation should be fine though
let txn = db_conn.begin().await?;
// if the transaction is already saved, return early
if increase_on_chain_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex()))
.filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id))
.filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(block_hash.encode_hex()))
.one(&txn)
.one(db_conn)
.await?
.is_some()
{
@ -303,11 +311,8 @@ pub async fn user_balance_post(
// TODO: check bloom filters
let mut user_ids_need_premium = HashSet::new();
// the transaction might contain multiple relevant logs. collect them all
let mut response_data = vec![];
let mut user_ids_to_invalidate = HashSet::new();
for log in transaction_receipt.logs {
if let Some(true) = log.removed {
// TODO: do we need to make sure this row is deleted? it should be handled by `handle_uncle_block`
@ -355,18 +360,17 @@ pub async fn user_balance_post(
payment_token_amount
);
let recipient = match user::Entity::find()
.filter(user::Column::Address.eq(recipient_account.as_bytes()))
.one(&txn)
.await?
{
Some(x) => x,
None => {
let (user, _) = register_new_user(&txn, recipient_account).await?;
let txn = db_conn.begin().await?;
user
}
};
let (recipient, recipient_tier) =
match get_user_and_tier_from_address(&recipient_account, &txn).await? {
Some(x) => x,
None => {
let (user, _) = register_new_user(&txn, recipient_account).await?;
(user, None)
}
};
// For now we only accept stablecoins. This will need conversions if we accept other tokens.
// 1$ = Decimal(1) for any stablecoin
@ -394,7 +398,11 @@ pub async fn user_balance_post(
receipt.save(&txn).await?;
user_ids_to_invalidate.insert(recipient.id);
grant_premium_tier(&recipient, recipient_tier.as_ref(), &txn)
.await
.web3_context("granting premium tier")?;
txn.commit().await?;
let x = json!({
"amount": payment_token_amount,
@ -409,33 +417,17 @@ pub async fn user_balance_post(
response_data.push(x);
user_ids_need_premium.insert(recipient);
// invalidate the cache as well
if let Err(err) = app
.user_balance_cache
.invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, user_id=%recipient.id, "unable to invalidate cache");
};
}
}
for user in user_ids_need_premium.into_iter() {
let user_tier = user_tier::Entity::find_by_id(user.user_tier_id)
.one(&txn)
.await?;
grant_premium_tier(&user, user_tier.as_ref(), &txn)
.await
.web3_context("granting premium tier")?;
}
txn.commit().await?;
for user_id in user_ids_to_invalidate.into_iter() {
// Finally invalidate the cache as well
if let Err(err) = app
.user_balance_cache
.invalidate(&user_id, db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");
};
}
let response = (StatusCode::CREATED, Json(json!(response_data))).into_response();
Ok(response)