use bloom filters and support transactions with multiple deposit events

This commit is contained in:
Bryan Stitt 2023-05-31 11:20:17 -07:00
parent b9f0824dfe
commit 7947cb95ff
7 changed files with 179 additions and 186 deletions

2
Cargo.lock generated

@ -6499,6 +6499,7 @@ dependencies = [
"derive_more",
"entities",
"env_logger",
"ethbloom",
"ethers",
"ewma",
"fdlimit",
@ -6535,6 +6536,7 @@ dependencies = [
"regex",
"reqwest",
"rmp-serde",
"rust_decimal",
"sentry",
"serde",
"serde_json",

@ -32,6 +32,7 @@ influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"}
# TODO: import chrono from sea-orm so we always have the same version
# TODO: make sure this time version matches siwe. PR to put this in their prelude
# TODO: rdkafka has a tracing feature
# TODO: axum has a tracing feature
# TODO: hdrhistogram for automated tiers
@ -47,6 +48,7 @@ console-subscriber = { version = "0.1.9", optional = true }
counter = "0.5.7"
derive_more = "0.99.17"
env_logger = "0.10.0"
ethbloom = "0.13.0"
ethers = { version = "2.0.6", default-features = false, features = ["rustls", "ws"] }
ewma = "0.1.1"
fdlimit = "0.2.1"
@ -77,6 +79,7 @@ rdkafka = { version = "0.31.0" }
regex = "1.8.3"
reqwest = { version = "0.11.18", default-features = false, features = ["deflate", "gzip", "json", "tokio-rustls"] }
rmp-serde = "1.1.1"
rust_decimal = { version = "1.29.1", features = ["maths"] }
sentry = { version = "0.31.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.163", features = [] }
serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] }

@ -937,13 +937,13 @@ impl Web3ProxyApp {
}
/// this is way more round-a-bout than we want, but it means stats are emitted and caches are used
/// request_with_caching
pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,
params: P,
authorization: Arc<Authorization>,
) -> Web3ProxyResult<R> {
// TODO: proper ids
let request = JsonRpcRequest::new(JsonRpcId::Number(1), method.to_string(), json!(params))?;
let (_, response, _) = self.proxy_request(request, authorization, None).await;

@ -3,10 +3,7 @@
use crate::frontend::authorization::Authorization;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse};
use crate::response_cache::JsonRpcResponseEnum;
use std::error::Error;
use std::{borrow::Cow, net::IpAddr};
use crate::rpcs::provider::EthersHttpProvider;
use axum::{
headers,
http::StatusCode,
@ -14,14 +11,18 @@ use axum::{
Json,
};
use derive_more::{Display, Error, From};
use ethers::prelude::ContractError;
use http::header::InvalidHeaderValue;
use ipnet::AddrParseError;
use log::{debug, error, info, trace, warn};
use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use rust_decimal::Error as DecimalError;
use serde::Serialize;
use serde_json::value::RawValue;
use std::error::Error;
use std::{borrow::Cow, net::IpAddr};
use tokio::{sync::AcquireError, task::JoinError, time::Instant};
pub type Web3ProxyResult<T> = Result<T, Web3ProxyError>;
@ -34,6 +35,7 @@ impl From<Web3ProxyError> for Web3ProxyResult<()> {
}
}
// TODO: replace all String with `Cow<'static, str>`
#[derive(Debug, Display, Error, From)]
pub enum Web3ProxyError {
AccessDenied,
@ -46,7 +48,9 @@ pub enum Web3ProxyError {
#[from(ignore)]
BadResponse(String),
BadRouting,
Contract(ContractError<EthersHttpProvider>),
Database(DbErr),
Decimal(DecimalError),
#[display(fmt = "{:#?}, {:#?}", _0, _1)]
EipVerificationFailed(Box<Web3ProxyError>, Box<Web3ProxyError>),
EthersHttpClient(ethers::prelude::HttpClientError),
@ -220,6 +224,28 @@ impl Web3ProxyError {
},
)
}
Self::Contract(err) => {
warn!("Contract Error: {}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: Cow::Owned(format!("contract error: {}", err)),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::Decimal(err) => {
debug!("Decimal Error: {}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
message: Cow::Owned(format!("decimal error: {}", err)),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
},
)
}
Self::EipVerificationFailed(err_1, err_191) => {
info!(
"EipVerificationFailed err_1={:#?} err2={:#?}",

@ -1,6 +1,6 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use anyhow::{anyhow, Context};
use anyhow::Context;
use axum::{
extract::Path,
headers::{authorization::Bearer, Authorization},
@ -8,29 +8,29 @@ use axum::{
Extension, Json, TypedHeader,
};
use axum_macros::debug_handler;
use entities::{balance, increase_on_chain_balance_receipt, user, user_tier};
use entities::{balance, increase_on_chain_balance_receipt, user};
use ethbloom::Input as BloomInput;
use ethers::abi::{AbiEncode, ParamType};
use ethers::prelude::abigen;
use ethers::types::{Address, TransactionReceipt, H256, U256};
use hashbrown::HashMap;
// use http::StatusCode;
use http::StatusCode;
use log::{debug, info, trace, warn};
// use migration::sea_orm;
// use migration::sea_orm::prelude::Decimal;
// use migration::sea_orm::ActiveModelTrait;
use migration::sea_orm::ColumnTrait;
use migration::sea_orm::EntityTrait;
// use migration::sea_orm::IntoActiveModel;
use migration::sea_orm::QueryFilter;
// use migration::sea_orm::TransactionTrait;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
TransactionTrait,
};
use num_traits::Pow;
use serde_json::json;
use std::str::FromStr;
use std::sync::Arc;
// TODO: do this in a build.rs so that the editor autocomplete and docs are better
abigen!(
IERC20,
r#"[
decimals() -> uint256
decimals() external view returns (uint256)
event Transfer(address indexed from, address indexed to, uint256 value)
event Approval(address indexed owner, address indexed spender, uint256 value)
]"#,
@ -40,8 +40,8 @@ abigen!(
PaymentFactory,
r#"[
event PaymentReceived(address indexed account, address token, uint256 amount)
account_to_payment_address(address) -> address
payment_address_to_account(address) -> address
account_to_payment_address(address) external view returns (address)
payment_address_to_account(address) external view returns (address)
]"#,
);
@ -110,6 +110,7 @@ pub async fn user_deposits_get(
out.insert("amount", serde_json::Value::String(x.amount.to_string()));
out.insert("chain_id", serde_json::Value::Number(x.chain_id.into()));
out.insert("tx_hash", serde_json::Value::String(x.tx_hash));
// TODO: log_index
out
})
.collect::<Vec<_>>();
@ -158,6 +159,7 @@ pub async fn user_balance_post(
.await?
.is_some()
{
// this will be status code 200, not 204
let response = Json(json!({
"result": "success",
"message": "this transaction was already in the database",
@ -168,17 +170,12 @@ pub async fn user_balance_post(
}
// get the transaction receipt
let transaction_receipt: Option<TransactionReceipt> = app
.internal_request("eth_getTransactionReceipt", (tx_hash,))
.await?;
let transaction_receipt = if let Some(transaction_receipt) = transaction_receipt {
transaction_receipt
} else {
return Err(Web3ProxyError::BadRequest(
let transaction_receipt = app
.internal_request::<_, Option<TransactionReceipt>>("eth_getTransactionReceipt", (tx_hash,))
.await?
.ok_or(Web3ProxyError::BadRequest(
format!("transaction receipt not found for {}", tx_hash,).into(),
));
};
))?;
trace!("Transaction receipt: {:#?}", transaction_receipt);
@ -192,76 +189,56 @@ pub async fn user_balance_post(
let payment_factory =
PaymentFactory::new(payment_factory_address, app.internal_provider().clone());
// there is no need to check accepted tokens. the smart contract already reverts if the token isn't accepted
// TODO: this should be in the abigen stuff somewhere
// let payment_factory_deposit_topic = payment_factory.something?;
let payment_factory_deposit_topic = app
.config
.deposit_topic
.context("A deposit_topic must be provided in the config to parse payments")?;
// let deposit_log = payment_factory.something?;
let bloom_input = BloomInput::Raw(payment_factory_deposit_topic.as_bytes());
// // TODO: do a quick check that this transaction contains the required log
// if !transaction_receipt.logs_bloom.contains_input(deposit_log) {
// return Err(Web3ProxyError::BadRequest("no matching logs found".into()));
// }
// do a quick check that this transaction contains the required log
if !transaction_receipt.logs_bloom.contains_input(bloom_input) {
return Err(Web3ProxyError::BadRequest("no matching logs found".into()));
}
let mut response_data = vec![];
let txn = db_conn.begin().await?;
// parse the logs from the transaction receipt
// there might be multiple logs with the event if the transaction is doing things in bulk
// TODO: change the indexes to be unique on (chain, txhash, log_index)
for log in transaction_receipt.logs {
// TODO: use abigen to make this simpler?
if log.address != payment_factory_address {
continue;
}
// TODO: check the log topic matches our factory
// TODO: check the log send matches our factory
let log_index = log
.log_index
.context("no log_index. transaction must not be confirmed")?;
// TODO: get the payment token address out of the event
let payment_token_address = Address::zero();
// TODO: get the payment token amount out of the event (wei = the integer unit)
let payment_token_wei = U256::zero();
let payment_token = IERC20::new(payment_token_address, app.internal_provider().clone());
// TODO: get the account the payment was received on behalf of (any account could have sent it)
let on_behalf_of_address = Address::zero();
// get the decimals for the token
let payment_token_decimals = payment_token.decimals().call().await;
todo!("now what?");
}
todo!("now what?");
/*
for log in transaction_receipt.logs {
if log.address != deposit_contract {
debug!(
"Out: Log is not relevant, as it is not directed to the deposit contract {:?} {:?}",
format!("{:?}", log.address),
deposit_contract
trace!(
"Out: Address is not relevant: {:?} {:?}",
log.address,
payment_factory_address,
);
continue;
}
// Get the topics out
let topic: H256 = log.topics.get(0).unwrap().to_owned();
if topic != deposit_topic {
debug!(
// TODO: use abigen to make this simpler?
let topic = log.topics.get(0).unwrap();
if *topic != payment_factory_deposit_topic {
trace!(
"Out: Topic is not relevant: {:?} {:?}",
topic, deposit_topic
topic,
payment_factory_deposit_topic,
);
continue;
}
// TODO: Will this work? Depends how logs are encoded
let (recipient_account, token, amount): (Address, Address, U256) = match ethers::abi::decode(
&[
ParamType::Address,
ParamType::Address,
ParamType::Uint(256usize),
],
// TODO: use abigen to make this simpler
let (recipient_account, payment_token_address, payment_token_wei): (
Address,
Address,
U256,
) = match ethers::abi::decode(
&[ParamType::Address, ParamType::Address, ParamType::Uint(256)],
&log.data,
) {
Ok(tpl) => (
@ -282,132 +259,116 @@ pub async fn user_balance_post(
.context("Could not decode amount")?,
),
Err(err) => {
warn!("Out: Could not decode! {:?}", err);
trace!("Out: Could not decode! {:?}", err);
continue;
}
};
// return early if amount is 0
if amount == U256::from(0) {
warn!(
"Out: Found log has amount = 0 {:?}. This should never be the case according to the smart contract",
amount
);
continue;
}
// there is no need to check that payment_token_address is an allowed token
// the smart contract already reverts if the token isn't accepted
// we used to skip here if amount is 0, but that means the txid wouldn't ever show up in the database which could be confusing
// also, the contract already reverts for 0 value
let log_index = log
.log_index
.context("no log_index. transaction must not be confirmed")?;
// the internal provider will handle caching
let payment_token = IERC20::new(payment_token_address, app.internal_provider().clone());
// get the decimals for the token
let payment_token_decimals = payment_token.decimals().call().await?;
// TODO: how should we do U256 to Decimal?
let decimal_shift = Decimal::from(10).pow(payment_token_decimals.as_u64());
let mut payment_token_amount =
Decimal::from_str(&format!("{}", payment_token_wei)).unwrap();
payment_token_amount.set_scale(payment_token_decimals.as_u32())?;
payment_token_amount /= decimal_shift;
info!(
"Found deposit transaction for: {:?} {:?} {:?}",
recipient_account, token, amount
recipient_account, payment_token_address, payment_token_amount
);
// Encoding is inefficient, revisit later
let recipient = match user::Entity::find()
.filter(user::Column::Address.eq(recipient_account.encode_hex()))
.one(db_replica.as_ref())
.one(&db_conn)
.await?
{
Some(x) => Ok(x),
None => Err(Web3ProxyError::BadRequest(
"The user must have signed up first. They are currently not signed up!".to_string(),
)),
}?;
Some(x) => x,
None => todo!("make their account"),
};
// For now we only accept stablecoins
// And we hardcode the peg (later we would have to depeg this, for example
// 1$ = Decimal(1) for any stablecoin
// TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now
debug!("Arithmetic is: {:?} {:?}", amount, decimals);
debug!(
"Decimals arithmetic is: {:?} {:?}",
Decimal::from(amount.as_u128()),
Decimal::from(10_u64.pow(decimals))
);
let mut amount = Decimal::from(amount.as_u128());
let _ = amount.set_scale(decimals);
debug!("Amount is: {:?}", amount);
// For now we only accept stablecoins
// And we hardcode the peg (later we would have to depeg this, for example
// 1$ = Decimal(1) for any stablecoin
// TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now
debug!(
"Arithmetic is: {:?} / 10 ^ {:?} = {:?}",
payment_token_wei, payment_token_decimals, payment_token_amount
);
// Check if the item is in the database. If it is not, then add it into the database
let user_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(recipient.id))
.one(&db_conn)
.await?;
// Check if the item is in the database. If it is not, then add it into the database
// TODO: select ... for update
let user_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(recipient.id))
.one(&txn)
.await?;
// Get the premium user-tier
let premium_user_tier = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq("Premium"))
.one(&db_conn)
.await?
.context("Could not find 'Premium' Tier in user-database")?;
match user_balance {
Some(user_balance) => {
// Update the entry, adding the balance
let balance_plus_amount = user_balance.available_balance + payment_token_amount;
let txn = db_conn.begin().await?;
match user_balance {
Some(user_balance) => {
let balance_plus_amount = user_balance.available_balance + amount;
info!("New user balance is: {:?}", balance_plus_amount);
// Update the entry, adding the balance
let mut active_user_balance = user_balance.into_active_model();
active_user_balance.available_balance = sea_orm::Set(balance_plus_amount);
let mut active_user_balance = user_balance.into_active_model();
active_user_balance.available_balance = sea_orm::Set(balance_plus_amount);
if balance_plus_amount >= Decimal::new(10, 0) {
// Also make the user premium at this point ...
let mut active_recipient = recipient.clone().into_active_model();
// Make the recipient premium "Effectively Unlimited"
active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id);
active_recipient.save(&txn).await?;
}
debug!("New user balance: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
}
None => {
// Create the entry with the respective balance
let active_user_balance = balance::ActiveModel {
available_balance: sea_orm::ActiveValue::Set(payment_token_amount),
user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
debug!("New user balance model is: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
// txn.commit().await?;
// user_balance
}
None => {
// Create the entry with the respective balance
let active_user_balance = balance::ActiveModel {
available_balance: sea_orm::ActiveValue::Set(amount),
user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
debug!("New user balance: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
}
};
if amount >= Decimal::new(10, 0) {
// Also make the user premium at this point ...
let mut active_recipient = recipient.clone().into_active_model();
// Make the recipient premium "Effectively Unlimited"
active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id);
active_recipient.save(&txn).await?;
}
debug!("Setting tx_hash: {:?}", tx_hash);
let receipt = increase_on_chain_balance_receipt::ActiveModel {
tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()),
chain_id: sea_orm::ActiveValue::Set(app.config.chain_id),
// TODO: log_index
amount: sea_orm::ActiveValue::Set(payment_token_amount),
deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
info!("New user balance model is: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
// txn.commit().await?;
// user_balance // .try_into_model().unwrap()
}
};
debug!("Setting tx_hash: {:?}", tx_hash);
let receipt = increase_on_chain_balance_receipt::ActiveModel {
tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()),
chain_id: sea_orm::ActiveValue::Set(app.config.chain_id),
amount: sea_orm::ActiveValue::Set(amount),
deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
receipt.save(&txn).await?;
receipt.save(&txn).await?;
txn.commit().await?;
debug!("Saved to db");
let x = json!({
"tx_hash": tx_hash,
"log_index": log_index,
"token": payment_token_address,
"amount": payment_token_amount,
});
let response = (
StatusCode::CREATED,
Json(json!({
"tx_hash": tx_hash,
"amount": amount
})),
)
.into_response();
response_data.push(x);
}
// Return early if the log was added, assume there is at most one valid log per transaction
return Ok(response);
}
*/
txn.commit().await?;
debug!("Saved to db");
let response = (StatusCode::CREATED, Json(json!(response_data))).into_response();
Ok(response)
}

@ -14,6 +14,7 @@ pub use migration::sea_orm::DatabaseConnection;
#[derive(Clone, From)]
pub struct DatabaseReplica(DatabaseConnection);
// TODO: this still doesn't work like i want. I want to be able to do `query.one(&DatabaseReplicate).await?`
impl AsRef<DatabaseConnection> for DatabaseReplica {
fn as_ref(&self) -> &DatabaseConnection {
&self.0

@ -7,7 +7,7 @@ use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{debug, trace, warn};
use log::{trace, warn};
use quick_cache_ttl::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};