stripe integration first mvp (replaces #140) (#154)

* stripe integration first mvp

* addressing some points from #140

* stripe endpoint needs testing

* addressed some comments

* added Json(payload)

* update toolchain

* lint (well, most of it)

---------

Co-authored-by: yenicelik <david.yenicelik@gmail.com>
This commit is contained in:
Bryan Stitt 2023-06-29 14:32:05 -07:00 committed by GitHub
parent f6c8172a13
commit 581af0d596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 748 additions and 173 deletions

1
.env

@ -1 +1,2 @@
DATABASE_URL=mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy
STRIPE_API_KEY=

497
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -47,6 +47,8 @@ redirect_rpc_key_url = "https://llamanodes.com/dashboard/keys?key={{rpc_key_id}}
# sentry is optional. it is used for browsing error logs
# sentry_url = "https://SENTRY_KEY_A.ingest.sentry.io/SENTRY_KEY_B"
stripe_api_key = ""
# public limits are when no key is used. these are instead grouped by ip
# 0 = block all public requests
public_max_concurrent_requests = 3

@ -18,5 +18,6 @@ pub mod rpc_key;
pub mod sea_orm_active_enums;
pub mod secondary_user;
pub mod serialization;
pub mod stripe_increase_balance_receipt;
pub mod user;
pub mod user_tier;

@ -0,0 +1,39 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
use sea_orm::entity::prelude::*;
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize)]
#[sea_orm(table_name = "stripe_increase_balance_receipt")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub deposit_to_user_id: Option<u64>,
pub stripe_payment_intend_id: String,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub amount: Decimal,
pub currency: String,
pub status: String,
pub description: Option<String>,
pub date_created: DateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::DepositToUserId",
to = "super::user::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
User,
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -32,6 +32,7 @@ mod m20230607_221917_total_deposits;
mod m20230615_221201_handle_payment_uncles;
mod m20230618_230611_longer_payload;
mod m20230619_172237_default_tracking;
mod m20230622_104142_stripe_deposits;
pub struct Migrator;
@ -71,6 +72,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230615_221201_handle_payment_uncles::Migration),
Box::new(m20230618_230611_longer_payload::Migration),
Box::new(m20230619_172237_default_tracking::Migration),
Box::new(m20230622_104142_stripe_deposits::Migration),
]
}
}

@ -0,0 +1,96 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.create_table(
Table::create()
.table(StripeIncreaseBalanceReceipt::Table)
.col(
ColumnDef::new(StripeIncreaseBalanceReceipt::Id)
.big_unsigned()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(StripeIncreaseBalanceReceipt::StripePaymentIntendId)
.string()
.not_null(),
)
.col(
// I will not mark this as "not null", so we can track who to refund more easily if needed
ColumnDef::new(StripeIncreaseBalanceReceipt::DepositToUserId)
.big_unsigned(),
)
.foreign_key(
ForeignKey::create()
.name("fk-stripe_deposits_to_user_id")
.from(
StripeIncreaseBalanceReceipt::Table,
StripeIncreaseBalanceReceipt::DepositToUserId,
)
.to(User::Table, User::Id),
)
.col(
ColumnDef::new(StripeIncreaseBalanceReceipt::Amount)
.decimal_len(20, 10)
.not_null(),
)
.col(
ColumnDef::new(StripeIncreaseBalanceReceipt::Currency)
.string()
.not_null(),
)
.col(
ColumnDef::new(StripeIncreaseBalanceReceipt::Status)
.string()
.not_null(),
)
.col(ColumnDef::new(StripeIncreaseBalanceReceipt::Description).string())
.col(
ColumnDef::new(StripeIncreaseBalanceReceipt::DateCreated)
.timestamp()
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(StripeIncreaseBalanceReceipt::Table)
.to_owned(),
)
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum User {
Table,
Id,
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum StripeIncreaseBalanceReceipt {
Table,
Id,
StripePaymentIntendId,
Amount,
Currency,
Status,
DepositToUserId,
Description,
DateCreated,
}

@ -82,13 +82,15 @@ regex = "1.8.4"
reqwest = { version = "0.11.18", default-features = false, features = ["deflate", "gzip", "json", "tokio-rustls"] }
rmp-serde = "1.1.1"
rust_decimal = { version = "1.30.0", features = ["maths"] }
async-stripe = { version = "0.22.2", features = ["runtime-tokio-hyper-rustls"] }
sentry = { version = "0.31.5", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] }
sentry-tracing = "0.31.5"
serde = { version = "1.0.164" }
serde_json = { version = "1.0.99", default-features = false, features = ["raw_value"] }
serde_prometheus = "0.2.3"
strum = { version = "0.25.0", features = ["derive"] }
time = { version = "0.3.22" }
time_01 = { package = "time", version = "0.1" }
time_03 = { package = "time", version = "0.3" }
tokio = { version = "1.29.0", features = ["full", "tracing"] }
tokio-console = { version = "0.1.8", optional = true }
tokio-stream = { version = "0.1.14", features = ["sync"] }

@ -166,6 +166,9 @@ pub struct AppConfig {
/// Optionally send errors to <https://sentry.io>
pub sentry_url: Option<Dsn>,
/// Stripe api key from environment variable,
pub stripe_api_key: Option<String>,
/// Track rate limits in a redis (or compatible backend)
/// It is okay if this data is lost.
pub volatile_redis_url: Option<String>,

@ -34,7 +34,7 @@ use siwe::{Message, VerificationOpts};
use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use time_03::{Duration, OffsetDateTime};
use tracing::{debug, info, warn};
use ulid::Ulid;

@ -168,6 +168,11 @@ pub async fn serve(
.route("/user", post(users::user_post))
.route("/user/balance", get(users::payment::user_balance_get))
.route("/user/deposits", get(users::payment::user_deposits_get))
.route(
"/user/balance/stripe",
get(users::payment_stripe::user_stripe_deposits_get)
.post(users::payment_stripe::user_balance_stripe_post),
)
.route(
"/user/balance/:tx_hash",
post(users::payment::user_balance_post),

@ -28,7 +28,7 @@ use siwe::{Message, VerificationOpts};
use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use time_03::{Duration, OffsetDateTime};
use tracing::{error, trace, warn};
use ulid::Ulid;

@ -1,6 +1,7 @@
//! Handle registration, logins, and managing account data.
pub mod authentication;
pub mod payment;
pub mod payment_stripe;
pub mod referral;
pub mod rpc_keys;
pub mod stats;

@ -0,0 +1,250 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::frontend::authorization::{
login_is_authorized, Authorization as Web3ProxyAuthorization,
};
use anyhow::Context;
use axum::{
headers::{authorization::Bearer, Authorization},
response::IntoResponse,
Extension, Json, TypedHeader,
};
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use entities::{
balance, increase_on_chain_balance_receipt, rpc_key, stripe_increase_balance_receipt, user,
};
use ethers::types::Address;
use http::HeaderMap;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait,
};
use migration::{Expr, OnConflict};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::num::NonZeroU64;
use std::sync::Arc;
use stripe::Webhook;
use tracing::{debug, error, trace};
/// `GET /user/balance/stripe` -- Use a bearer token to get the user's balance and spend.
///
/// - shows a list of all stripe deposits, all fields from entity
#[debug_handler]
pub async fn user_stripe_deposits_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica().context("Getting database connection")?;
// Filter by user ...
let receipts = stripe_increase_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::DepositToUserId.eq(user.id))
.all(db_replica.as_ref())
.await?;
// Return the response, all except the user ...
let response = json!({
"user": Address::from_slice(&user.address),
"deposits": receipts,
});
Ok(Json(response).into_response())
}
// /// the JSON input to the `post_user` handler.
// /// TODO: what else can we update here? password hash? subscription to newsletter?
#[derive(Debug, Serialize, Deserialize)]
pub struct StripePost {
// email: Option<String>,
// referral_code: Option<String>,
data: Box<serde_json::value::RawValue>,
}
/// `POST /user/balance/stripe` -- Process a stripe transaction;
/// this endpoint is called from the webhook with the user_id parameter in the request
#[debug_handler]
pub async fn user_balance_stripe_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
InsecureClientIp(ip): InsecureClientIp,
headers: HeaderMap,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Json(payload): Json<StripePost>,
) -> Web3ProxyResponse {
// rate limit by bearer token **OR** IP address
let (_, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let (_, semaphore) = app.bearer_is_authorized(bearer).await?;
// TODO: is handling this as internal fine?
let authorization = Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())?;
(authorization, Some(semaphore))
} else {
let authorization = login_is_authorized(&app, ip).await?;
(authorization, None)
};
// let recipient_user_id: u64 = params
// .remove("user_id")
// .ok_or(Web3ProxyError::BadRouting)?
// .parse()
// .or(Err(Web3ProxyError::ParseAddressError))?;
trace!(?payload);
// Get the payload, and the header
// let payload = payload.data.get("data").ok_or(Web3ProxyError::BadRequest(
// "You have not provided a 'data' for the Stripe payload".into(),
// ))?;
// TODO Get this from the header
let signature = headers
.get("STRIPE_SIGNATURE")
.ok_or(Web3ProxyError::BadRequest(
"You have not provided a 'STRIPE_SIGNATURE' for the Stripe payload".into(),
))?
.to_str()
.web3_context("Could not parse stripe signature as byte-string")?;
// Now parse the payload and signature
// TODO: Move env variable elsewhere
let event = Webhook::construct_event(
payload.data.get(),
signature,
app.config
.stripe_api_key
.clone()
.web3_context("Stripe API key not found in config!")?
.as_str(),
)
.context(Web3ProxyError::BadRequest(
"Could not parse the stripe webhook request!".into(),
))?;
let intent = match event.data.object {
stripe::EventObject::PaymentIntent(intent) => intent,
_ => return Ok("Received irrelevant webhook".into_response()),
};
debug!("Found PaymentIntent Event: {:?}", intent);
if intent.status.as_str() != "succeeded" {
return Ok("Received Webhook".into_response());
}
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
if stripe_increase_balance_receipt::Entity::find()
.filter(
stripe_increase_balance_receipt::Column::StripePaymentIntendId.eq(intent.id.as_str()),
)
.one(db_conn)
.await?
.is_some()
{
return Ok("Payment was already recorded".into_response());
};
// Try to get the recipient_user_id from the data metadata
let recipient_user_id = match intent.metadata.get("user_id") {
Some(x) => Ok(x.parse::<u64>()),
None => Err(Web3ProxyError::BadRequest(
"Could not find user_id in the stripe webhook request!".into(),
)),
}?
.context(Web3ProxyError::BadRequest(
"Could not parse the stripe webhook request user_id!".into(),
))?;
let recipient: Option<user::Model> = user::Entity::find_by_id(recipient_user_id)
.one(db_conn)
.await?;
// we do a fixed 2 decimal points because we only accept USD for now
let amount = Decimal::new(intent.amount, 2);
let recipient_id: Option<u64> = recipient.as_ref().map(|x| x.id);
let insert_receipt_model = stripe_increase_balance_receipt::ActiveModel {
id: Default::default(),
deposit_to_user_id: sea_orm::Set(recipient_id),
amount: sea_orm::Set(amount),
stripe_payment_intend_id: sea_orm::Set(intent.id.as_str().to_string()),
currency: sea_orm::Set(intent.currency.to_string()),
status: sea_orm::Set(intent.status.to_string()),
description: sea_orm::Set(intent.description),
date_created: Default::default(),
};
// In all these cases, we should record the transaction, but not increase the balance
let txn = db_conn.begin().await?;
// Assert that it's usd
if intent.currency.to_string() != "USD" || recipient.is_none() {
// In this case I should probably still save it to the database,
// but not increase balance (this should be refunded)
// TODO: I suppose we could send a refund request right away from here
error!(
currency=%intent.currency, %recipient_user_id, %intent.id,
"Please refund this transaction!",
);
let _ = insert_receipt_model.save(&txn).await;
txn.commit().await?;
return Ok("Received Webhook".into_response());
}
// Otherwise, also increase the balance ...
match recipient {
Some(recipient) => {
// Create a balance update as well
let balance_entry = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(amount),
user_id: sea_orm::Set(recipient.id),
..Default::default()
};
trace!(?balance_entry, "Trying to insert into balance entry");
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(amount),
)])
.to_owned(),
)
.exec(&txn)
.await
.web3_context("increasing balance")?;
let _ = insert_receipt_model.save(&txn).await;
let recipient_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(recipient.id))
.all(&txn)
.await?;
txn.commit().await?;
// Finally invalidate the cache as well
match NonZeroU64::try_from(recipient.id) {
Err(_) => {}
Ok(x) => {
app.user_balance_cache.invalidate(&x).await;
}
};
for rpc_key_entity in recipient_rpc_keys {
app.rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
}
None => {
return Err(Web3ProxyError::BadResponse(
"We just checked if the recipient is not none, it should've branched before!"
.into(),
))
}
};
Ok("Received webhook".into_response())
}

@ -8,7 +8,7 @@ use std::{
hash::{Hash, Hasher},
panic::PanicInfo,
};
use time::OffsetDateTime;
use time_03::OffsetDateTime;
use tracing::{debug, error, warn};
/*

@ -25,19 +25,19 @@ use web3_proxy::{
pub struct TestApp {
/// anvil shuts down when this guard is dropped.
anvil: AnvilInstance,
/// spawn handle for the proxy.
handle: Mutex<Option<JoinHandle<anyhow::Result<()>>>>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
pub anvil: AnvilInstance,
/// connection to anvil.
pub anvil_provider: Provider<Http>,
/// spawn handle for the proxy.
pub handle: Mutex<Option<JoinHandle<anyhow::Result<()>>>>,
/// connection to the proxy that is connected to anil.
pub proxy_provider: Provider<Http>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
}
impl TestApp {