From 7947cb95ff105e98901b6a7ffa6f8ca89af7e9e4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 31 May 2023 11:20:17 -0700 Subject: [PATCH] use bloom filters and support transactions with multiple deposit events --- Cargo.lock | 2 + web3_proxy/Cargo.toml | 3 + web3_proxy/src/app/mod.rs | 2 +- web3_proxy/src/errors.rs | 34 ++- web3_proxy/src/frontend/users/payment.rs | 321 ++++++++++------------- web3_proxy/src/relational_db.rs | 1 + web3_proxy/src/rpcs/consensus.rs | 2 +- 7 files changed, 179 insertions(+), 186 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e36a5a2b..0521ae1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6499,6 +6499,7 @@ dependencies = [ "derive_more", "entities", "env_logger", + "ethbloom", "ethers", "ewma", "fdlimit", @@ -6535,6 +6536,7 @@ dependencies = [ "regex", "reqwest", "rmp-serde", + "rust_decimal", "sentry", "serde", "serde_json", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 02491241..a751e96a 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -32,6 +32,7 @@ influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"} # TODO: import chrono from sea-orm so we always have the same version # TODO: make sure this time version matches siwe. PR to put this in their prelude # TODO: rdkafka has a tracing feature +# TODO: axum has a tracing feature # TODO: hdrhistogram for automated tiers @@ -47,6 +48,7 @@ console-subscriber = { version = "0.1.9", optional = true } counter = "0.5.7" derive_more = "0.99.17" env_logger = "0.10.0" +ethbloom = "0.13.0" ethers = { version = "2.0.6", default-features = false, features = ["rustls", "ws"] } ewma = "0.1.1" fdlimit = "0.2.1" @@ -77,6 +79,7 @@ rdkafka = { version = "0.31.0" } regex = "1.8.3" reqwest = { version = "0.11.18", default-features = false, features = ["deflate", "gzip", "json", "tokio-rustls"] } rmp-serde = "1.1.1" +rust_decimal = { version = "1.29.1", features = ["maths"] } sentry = { version = "0.31.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.163", features = [] } serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5edad18e..f2680bea 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -937,13 +937,13 @@ impl Web3ProxyApp { } /// this is way more round-a-bout than we want, but it means stats are emitted and caches are used - /// request_with_caching pub async fn authorized_request( self: &Arc, method: &str, params: P, authorization: Arc, ) -> Web3ProxyResult { + // TODO: proper ids let request = JsonRpcRequest::new(JsonRpcId::Number(1), method.to_string(), json!(params))?; let (_, response, _) = self.proxy_request(request, authorization, None).await; diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index a8338f55..43e16b4a 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -3,10 +3,7 @@ use crate::frontend::authorization::Authorization; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse}; use crate::response_cache::JsonRpcResponseEnum; - -use std::error::Error; -use std::{borrow::Cow, net::IpAddr}; - +use crate::rpcs::provider::EthersHttpProvider; use axum::{ headers, http::StatusCode, @@ -14,14 +11,18 @@ use axum::{ Json, }; use derive_more::{Display, Error, From}; +use ethers::prelude::ContractError; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; +use rust_decimal::Error as DecimalError; use serde::Serialize; use serde_json::value::RawValue; +use std::error::Error; +use std::{borrow::Cow, net::IpAddr}; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; pub type Web3ProxyResult = Result; @@ -34,6 +35,7 @@ impl From for Web3ProxyResult<()> { } } +// TODO: replace all String with `Cow<'static, str>` #[derive(Debug, Display, Error, From)] pub enum Web3ProxyError { AccessDenied, @@ -46,7 +48,9 @@ pub enum Web3ProxyError { #[from(ignore)] BadResponse(String), BadRouting, + Contract(ContractError), Database(DbErr), + Decimal(DecimalError), #[display(fmt = "{:#?}, {:#?}", _0, _1)] EipVerificationFailed(Box, Box), EthersHttpClient(ethers::prelude::HttpClientError), @@ -220,6 +224,28 @@ impl Web3ProxyError { }, ) } + Self::Contract(err) => { + warn!("Contract Error: {}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: Cow::Owned(format!("contract error: {}", err)), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } + Self::Decimal(err) => { + debug!("Decimal Error: {}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcErrorData { + message: Cow::Owned(format!("decimal error: {}", err)), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, + ) + } Self::EipVerificationFailed(err_1, err_191) => { info!( "EipVerificationFailed err_1={:#?} err2={:#?}", diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 75c7e44e..c2039c06 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,6 +1,6 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use axum::{ extract::Path, headers::{authorization::Bearer, Authorization}, @@ -8,29 +8,29 @@ use axum::{ Extension, Json, TypedHeader, }; use axum_macros::debug_handler; -use entities::{balance, increase_on_chain_balance_receipt, user, user_tier}; +use entities::{balance, increase_on_chain_balance_receipt, user}; +use ethbloom::Input as BloomInput; use ethers::abi::{AbiEncode, ParamType}; use ethers::prelude::abigen; use ethers::types::{Address, TransactionReceipt, H256, U256}; use hashbrown::HashMap; -// use http::StatusCode; +use http::StatusCode; use log::{debug, info, trace, warn}; -// use migration::sea_orm; -// use migration::sea_orm::prelude::Decimal; -// use migration::sea_orm::ActiveModelTrait; -use migration::sea_orm::ColumnTrait; -use migration::sea_orm::EntityTrait; -// use migration::sea_orm::IntoActiveModel; -use migration::sea_orm::QueryFilter; -// use migration::sea_orm::TransactionTrait; +use migration::sea_orm::prelude::Decimal; +use migration::sea_orm::{ + self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, + TransactionTrait, +}; +use num_traits::Pow; use serde_json::json; +use std::str::FromStr; use std::sync::Arc; // TODO: do this in a build.rs so that the editor autocomplete and docs are better abigen!( IERC20, r#"[ - decimals() -> uint256 + decimals() external view returns (uint256) event Transfer(address indexed from, address indexed to, uint256 value) event Approval(address indexed owner, address indexed spender, uint256 value) ]"#, @@ -40,8 +40,8 @@ abigen!( PaymentFactory, r#"[ event PaymentReceived(address indexed account, address token, uint256 amount) - account_to_payment_address(address) -> address - payment_address_to_account(address) -> address + account_to_payment_address(address) external view returns (address) + payment_address_to_account(address) external view returns (address) ]"#, ); @@ -110,6 +110,7 @@ pub async fn user_deposits_get( out.insert("amount", serde_json::Value::String(x.amount.to_string())); out.insert("chain_id", serde_json::Value::Number(x.chain_id.into())); out.insert("tx_hash", serde_json::Value::String(x.tx_hash)); + // TODO: log_index out }) .collect::>(); @@ -158,6 +159,7 @@ pub async fn user_balance_post( .await? .is_some() { + // this will be status code 200, not 204 let response = Json(json!({ "result": "success", "message": "this transaction was already in the database", @@ -168,17 +170,12 @@ pub async fn user_balance_post( } // get the transaction receipt - let transaction_receipt: Option = app - .internal_request("eth_getTransactionReceipt", (tx_hash,)) - .await?; - - let transaction_receipt = if let Some(transaction_receipt) = transaction_receipt { - transaction_receipt - } else { - return Err(Web3ProxyError::BadRequest( + let transaction_receipt = app + .internal_request::<_, Option>("eth_getTransactionReceipt", (tx_hash,)) + .await? + .ok_or(Web3ProxyError::BadRequest( format!("transaction receipt not found for {}", tx_hash,).into(), - )); - }; + ))?; trace!("Transaction receipt: {:#?}", transaction_receipt); @@ -192,76 +189,56 @@ pub async fn user_balance_post( let payment_factory = PaymentFactory::new(payment_factory_address, app.internal_provider().clone()); - // there is no need to check accepted tokens. the smart contract already reverts if the token isn't accepted + // TODO: this should be in the abigen stuff somewhere + // let payment_factory_deposit_topic = payment_factory.something?; + let payment_factory_deposit_topic = app + .config + .deposit_topic + .context("A deposit_topic must be provided in the config to parse payments")?; - // let deposit_log = payment_factory.something?; + let bloom_input = BloomInput::Raw(payment_factory_deposit_topic.as_bytes()); - // // TODO: do a quick check that this transaction contains the required log - // if !transaction_receipt.logs_bloom.contains_input(deposit_log) { - // return Err(Web3ProxyError::BadRequest("no matching logs found".into())); - // } + // do a quick check that this transaction contains the required log + if !transaction_receipt.logs_bloom.contains_input(bloom_input) { + return Err(Web3ProxyError::BadRequest("no matching logs found".into())); + } + + let mut response_data = vec![]; + + let txn = db_conn.begin().await?; // parse the logs from the transaction receipt // there might be multiple logs with the event if the transaction is doing things in bulk // TODO: change the indexes to be unique on (chain, txhash, log_index) for log in transaction_receipt.logs { + // TODO: use abigen to make this simpler? if log.address != payment_factory_address { - continue; - } - // TODO: check the log topic matches our factory - // TODO: check the log send matches our factory - - let log_index = log - .log_index - .context("no log_index. transaction must not be confirmed")?; - - // TODO: get the payment token address out of the event - let payment_token_address = Address::zero(); - - // TODO: get the payment token amount out of the event (wei = the integer unit) - let payment_token_wei = U256::zero(); - - let payment_token = IERC20::new(payment_token_address, app.internal_provider().clone()); - - // TODO: get the account the payment was received on behalf of (any account could have sent it) - let on_behalf_of_address = Address::zero(); - - // get the decimals for the token - let payment_token_decimals = payment_token.decimals().call().await; - - todo!("now what?"); - } - - todo!("now what?"); - /* - - for log in transaction_receipt.logs { - if log.address != deposit_contract { - debug!( - "Out: Log is not relevant, as it is not directed to the deposit contract {:?} {:?}", - format!("{:?}", log.address), - deposit_contract + trace!( + "Out: Address is not relevant: {:?} {:?}", + log.address, + payment_factory_address, ); continue; } - // Get the topics out - let topic: H256 = log.topics.get(0).unwrap().to_owned(); - if topic != deposit_topic { - debug!( + // TODO: use abigen to make this simpler? + let topic = log.topics.get(0).unwrap(); + if *topic != payment_factory_deposit_topic { + trace!( "Out: Topic is not relevant: {:?} {:?}", - topic, deposit_topic + topic, + payment_factory_deposit_topic, ); continue; } - // TODO: Will this work? Depends how logs are encoded - let (recipient_account, token, amount): (Address, Address, U256) = match ethers::abi::decode( - &[ - ParamType::Address, - ParamType::Address, - ParamType::Uint(256usize), - ], + // TODO: use abigen to make this simpler + let (recipient_account, payment_token_address, payment_token_wei): ( + Address, + Address, + U256, + ) = match ethers::abi::decode( + &[ParamType::Address, ParamType::Address, ParamType::Uint(256)], &log.data, ) { Ok(tpl) => ( @@ -282,132 +259,116 @@ pub async fn user_balance_post( .context("Could not decode amount")?, ), Err(err) => { - warn!("Out: Could not decode! {:?}", err); + trace!("Out: Could not decode! {:?}", err); continue; } }; - // return early if amount is 0 - if amount == U256::from(0) { - warn!( - "Out: Found log has amount = 0 {:?}. This should never be the case according to the smart contract", - amount - ); - continue; - } + // there is no need to check that payment_token_address is an allowed token + // the smart contract already reverts if the token isn't accepted + + // we used to skip here if amount is 0, but that means the txid wouldn't ever show up in the database which could be confusing + // also, the contract already reverts for 0 value + + let log_index = log + .log_index + .context("no log_index. transaction must not be confirmed")?; + + // the internal provider will handle caching + let payment_token = IERC20::new(payment_token_address, app.internal_provider().clone()); + + // get the decimals for the token + let payment_token_decimals = payment_token.decimals().call().await?; + + // TODO: how should we do U256 to Decimal? + let decimal_shift = Decimal::from(10).pow(payment_token_decimals.as_u64()); + + let mut payment_token_amount = + Decimal::from_str(&format!("{}", payment_token_wei)).unwrap(); + payment_token_amount.set_scale(payment_token_decimals.as_u32())?; + payment_token_amount /= decimal_shift; info!( "Found deposit transaction for: {:?} {:?} {:?}", - recipient_account, token, amount + recipient_account, payment_token_address, payment_token_amount ); // Encoding is inefficient, revisit later let recipient = match user::Entity::find() .filter(user::Column::Address.eq(recipient_account.encode_hex())) - .one(db_replica.as_ref()) + .one(&db_conn) .await? { - Some(x) => Ok(x), - None => Err(Web3ProxyError::BadRequest( - "The user must have signed up first. They are currently not signed up!".to_string(), - )), - }?; + Some(x) => x, + None => todo!("make their account"), + }; - // For now we only accept stablecoins - // And we hardcode the peg (later we would have to depeg this, for example - // 1$ = Decimal(1) for any stablecoin - // TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now - debug!("Arithmetic is: {:?} {:?}", amount, decimals); - debug!( - "Decimals arithmetic is: {:?} {:?}", - Decimal::from(amount.as_u128()), - Decimal::from(10_u64.pow(decimals)) - ); - let mut amount = Decimal::from(amount.as_u128()); - let _ = amount.set_scale(decimals); - debug!("Amount is: {:?}", amount); + // For now we only accept stablecoins + // And we hardcode the peg (later we would have to depeg this, for example + // 1$ = Decimal(1) for any stablecoin + // TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now + debug!( + "Arithmetic is: {:?} / 10 ^ {:?} = {:?}", + payment_token_wei, payment_token_decimals, payment_token_amount + ); - // Check if the item is in the database. If it is not, then add it into the database - let user_balance = balance::Entity::find() - .filter(balance::Column::UserId.eq(recipient.id)) - .one(&db_conn) - .await?; + // Check if the item is in the database. If it is not, then add it into the database + // TODO: select ... for update + let user_balance = balance::Entity::find() + .filter(balance::Column::UserId.eq(recipient.id)) + .one(&txn) + .await?; - // Get the premium user-tier - let premium_user_tier = user_tier::Entity::find() - .filter(user_tier::Column::Title.eq("Premium")) - .one(&db_conn) - .await? - .context("Could not find 'Premium' Tier in user-database")?; + match user_balance { + Some(user_balance) => { + // Update the entry, adding the balance + let balance_plus_amount = user_balance.available_balance + payment_token_amount; - let txn = db_conn.begin().await?; - match user_balance { - Some(user_balance) => { - let balance_plus_amount = user_balance.available_balance + amount; - info!("New user balance is: {:?}", balance_plus_amount); - // Update the entry, adding the balance - let mut active_user_balance = user_balance.into_active_model(); - active_user_balance.available_balance = sea_orm::Set(balance_plus_amount); + let mut active_user_balance = user_balance.into_active_model(); + active_user_balance.available_balance = sea_orm::Set(balance_plus_amount); - if balance_plus_amount >= Decimal::new(10, 0) { - // Also make the user premium at this point ... - let mut active_recipient = recipient.clone().into_active_model(); - // Make the recipient premium "Effectively Unlimited" - active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id); - active_recipient.save(&txn).await?; - } + debug!("New user balance: {:?}", active_user_balance); + active_user_balance.save(&txn).await?; + } + None => { + // Create the entry with the respective balance + let active_user_balance = balance::ActiveModel { + available_balance: sea_orm::ActiveValue::Set(payment_token_amount), + user_id: sea_orm::ActiveValue::Set(recipient.id), + ..Default::default() + }; - debug!("New user balance model is: {:?}", active_user_balance); - active_user_balance.save(&txn).await?; - // txn.commit().await?; - // user_balance - } - None => { - // Create the entry with the respective balance - let active_user_balance = balance::ActiveModel { - available_balance: sea_orm::ActiveValue::Set(amount), - user_id: sea_orm::ActiveValue::Set(recipient.id), - ..Default::default() - }; + debug!("New user balance: {:?}", active_user_balance); + active_user_balance.save(&txn).await?; + } + }; - if amount >= Decimal::new(10, 0) { - // Also make the user premium at this point ... - let mut active_recipient = recipient.clone().into_active_model(); - // Make the recipient premium "Effectively Unlimited" - active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id); - active_recipient.save(&txn).await?; - } + debug!("Setting tx_hash: {:?}", tx_hash); + let receipt = increase_on_chain_balance_receipt::ActiveModel { + tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()), + chain_id: sea_orm::ActiveValue::Set(app.config.chain_id), + // TODO: log_index + amount: sea_orm::ActiveValue::Set(payment_token_amount), + deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id), + ..Default::default() + }; - info!("New user balance model is: {:?}", active_user_balance); - active_user_balance.save(&txn).await?; - // txn.commit().await?; - // user_balance // .try_into_model().unwrap() - } - }; - debug!("Setting tx_hash: {:?}", tx_hash); - let receipt = increase_on_chain_balance_receipt::ActiveModel { - tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()), - chain_id: sea_orm::ActiveValue::Set(app.config.chain_id), - amount: sea_orm::ActiveValue::Set(amount), - deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id), - ..Default::default() - }; + receipt.save(&txn).await?; - receipt.save(&txn).await?; - txn.commit().await?; - debug!("Saved to db"); + let x = json!({ + "tx_hash": tx_hash, + "log_index": log_index, + "token": payment_token_address, + "amount": payment_token_amount, + }); - let response = ( - StatusCode::CREATED, - Json(json!({ - "tx_hash": tx_hash, - "amount": amount - })), - ) - .into_response(); + response_data.push(x); + } - // Return early if the log was added, assume there is at most one valid log per transaction - return Ok(response); - } - */ + txn.commit().await?; + debug!("Saved to db"); + + let response = (StatusCode::CREATED, Json(json!(response_data))).into_response(); + + Ok(response) } diff --git a/web3_proxy/src/relational_db.rs b/web3_proxy/src/relational_db.rs index d1ccb581..a5999a19 100644 --- a/web3_proxy/src/relational_db.rs +++ b/web3_proxy/src/relational_db.rs @@ -14,6 +14,7 @@ pub use migration::sea_orm::DatabaseConnection; #[derive(Clone, From)] pub struct DatabaseReplica(DatabaseConnection); +// TODO: this still doesn't work like i want. I want to be able to do `query.one(&DatabaseReplicate).await?` impl AsRef for DatabaseReplica { fn as_ref(&self) -> &DatabaseConnection { &self.0 diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index a9f8397c..ed7073d9 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{debug, trace, warn}; +use log::{trace, warn}; use quick_cache_ttl::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse};