From 28e6be5d9b5c06ac728a8dcbec874aad17f67ed4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 13 Jul 2023 10:13:50 -0700 Subject: [PATCH] handle uncles in the background and keep transaction open for a shorter time --- web3_proxy/src/frontend/users/payment.rs | 90 +++++++++++------------- 1 file changed, 41 insertions(+), 49 deletions(-) diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index eb22314f..897be766 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -5,7 +5,7 @@ use crate::frontend::authorization::{ login_is_authorized, Authorization as Web3ProxyAuthorization, }; use crate::frontend::users::authentication::register_new_user; -use crate::premium::grant_premium_tier; +use crate::premium::{get_user_and_tier_from_address, grant_premium_tier}; use anyhow::Context; use axum::{ extract::Path, @@ -17,7 +17,7 @@ use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use entities::{ admin_increase_balance_receipt, increase_on_chain_balance_receipt, - stripe_increase_balance_receipt, user, user_tier, + stripe_increase_balance_receipt, }; use ethers::abi::AbiEncode; use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256}; @@ -31,7 +31,7 @@ use payment_contracts::ierc20::IERC20; use payment_contracts::payment_factory::{self, PaymentFactory}; use serde_json::json; use std::sync::Arc; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; /// Implements any logic related to payments /// Removed this mainly from "user" as this was getting clogged @@ -250,10 +250,20 @@ pub async fn user_balance_post( .map(|x| serde_json::from_str(x.block_hash.as_str()).unwrap()) .collect(); - for uncle_hash in uncle_hashes.into_iter() { - if let Some(x) = handle_uncle_block(&app, &authorization, uncle_hash).await? { - info!("balance changes from uncle: {:#?}", x); - } + { + let app = app.clone(); + tokio::spawn(async move { + for uncle_hash in uncle_hashes.into_iter() { + match handle_uncle_block(&app, &authorization, uncle_hash).await { + Ok(x) => { + info!(?x, "balance changes from uncle"); + } + Err(err) => { + error!(?err, "handling uncle block"); + } + } + } + }); } if tx_pending { @@ -276,14 +286,12 @@ pub async fn user_balance_post( // TODO: if the transaction doesn't have enough confirmations yet, add it to a queue to try again later // 1 confirmation should be fine though - let txn = db_conn.begin().await?; - // if the transaction is already saved, return early if increase_on_chain_balance_receipt::Entity::find() .filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex())) .filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id)) .filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(block_hash.encode_hex())) - .one(&txn) + .one(db_conn) .await? .is_some() { @@ -303,11 +311,8 @@ pub async fn user_balance_post( // TODO: check bloom filters - let mut user_ids_need_premium = HashSet::new(); - // the transaction might contain multiple relevant logs. collect them all let mut response_data = vec![]; - let mut user_ids_to_invalidate = HashSet::new(); for log in transaction_receipt.logs { if let Some(true) = log.removed { // TODO: do we need to make sure this row is deleted? it should be handled by `handle_uncle_block` @@ -355,18 +360,17 @@ pub async fn user_balance_post( payment_token_amount ); - let recipient = match user::Entity::find() - .filter(user::Column::Address.eq(recipient_account.as_bytes())) - .one(&txn) - .await? - { - Some(x) => x, - None => { - let (user, _) = register_new_user(&txn, recipient_account).await?; + let txn = db_conn.begin().await?; - user - } - }; + let (recipient, recipient_tier) = + match get_user_and_tier_from_address(&recipient_account, &txn).await? { + Some(x) => x, + None => { + let (user, _) = register_new_user(&txn, recipient_account).await?; + + (user, None) + } + }; // For now we only accept stablecoins. This will need conversions if we accept other tokens. // 1$ = Decimal(1) for any stablecoin @@ -394,7 +398,11 @@ pub async fn user_balance_post( receipt.save(&txn).await?; - user_ids_to_invalidate.insert(recipient.id); + grant_premium_tier(&recipient, recipient_tier.as_ref(), &txn) + .await + .web3_context("granting premium tier")?; + + txn.commit().await?; let x = json!({ "amount": payment_token_amount, @@ -409,33 +417,17 @@ pub async fn user_balance_post( response_data.push(x); - user_ids_need_premium.insert(recipient); + // invalidate the cache as well + if let Err(err) = app + .user_balance_cache + .invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache) + .await + { + warn!(?err, user_id=%recipient.id, "unable to invalidate cache"); + }; } } - for user in user_ids_need_premium.into_iter() { - let user_tier = user_tier::Entity::find_by_id(user.user_tier_id) - .one(&txn) - .await?; - - grant_premium_tier(&user, user_tier.as_ref(), &txn) - .await - .web3_context("granting premium tier")?; - } - - txn.commit().await?; - - for user_id in user_ids_to_invalidate.into_iter() { - // Finally invalidate the cache as well - if let Err(err) = app - .user_balance_cache - .invalidate(&user_id, db_conn, &app.rpc_secret_key_cache) - .await - { - warn!(?err, "unable to invalidate caches"); - }; - } - let response = (StatusCode::CREATED, Json(json!(response_data))).into_response(); Ok(response)