From 63eb2c43912f67459e49bd9461329d7f3f3befd2 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Thu, 8 Jun 2023 12:14:28 +0200 Subject: [PATCH 1/4] removed bloom filter temporarily, added some fixes with decimals in payment.rs --- .../src/increase_on_chain_balance_receipt.rs | 1 - ...14_134254_increase_balance_transactions.rs | 3 +- scripts/101-balance-referral-stats.sh | 111 ++++++++++++++++++ web3_proxy/src/frontend/users/payment.rs | 63 ++++++---- 4 files changed, 152 insertions(+), 26 deletions(-) create mode 100644 scripts/101-balance-referral-stats.sh diff --git a/entities/src/increase_on_chain_balance_receipt.rs b/entities/src/increase_on_chain_balance_receipt.rs index 24acfd43..2e4821c7 100644 --- a/entities/src/increase_on_chain_balance_receipt.rs +++ b/entities/src/increase_on_chain_balance_receipt.rs @@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: i32, - #[sea_orm(unique)] pub tx_hash: String, pub chain_id: u64, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] diff --git a/migration/src/m20230214_134254_increase_balance_transactions.rs b/migration/src/m20230214_134254_increase_balance_transactions.rs index 2de3db9a..5354e605 100644 --- a/migration/src/m20230214_134254_increase_balance_transactions.rs +++ b/migration/src/m20230214_134254_increase_balance_transactions.rs @@ -26,7 +26,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(IncreaseOnChainBalanceReceipt::ChainId) - .big_integer() + .big_unsigned() .not_null(), ) .col( @@ -37,7 +37,6 @@ impl MigrationTrait for Migration { .col( ColumnDef::new(IncreaseOnChainBalanceReceipt::DepositToUserId) .big_unsigned() - .unique_key() .not_null(), ) .foreign_key( diff --git a/scripts/101-balance-referral-stats.sh b/scripts/101-balance-referral-stats.sh new file mode 100644 index 00000000..33fee68b --- /dev/null +++ b/scripts/101-balance-referral-stats.sh @@ -0,0 +1,111 @@ +################## +# Run the server +################## + +# Keep the proxyd instance running the background (and test that it works) +cargo run --release -- proxyd + +# Check if the instance is running +curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544 + +################## +# Login 2 users +################## +http://127.0.0.1:8544/user/login/0xeB3E928A2E54BE013EF8241d4C9EaF4DfAE94D5a +https://www.myetherwallet.com/wallet/sign + +# Use this site to sign a message +curl -X POST http://127.0.0.1:8544/user/login \ + -H 'Content-Type: application/json' \ + -d '{ + "address": "0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a", + "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a20303148324435424e47584d423539394e3952314a3148575a444e0a4973737565642041743a20323032332d30362d30385430393a32383a31362e36363939335a0a45787069726174696f6e2054696d653a20323032332d30362d30385430393a34383a31362e36363939335a", + "sig": "cb85bccbc12619a5b492817ae26f1daea1aff921c5a404c190c5cd371afda9cc299f2685e97c7ca996ab8c2cbb74b8e115a24251bd09a393b628490df65793161b", + "version": "3", + "signer": "MEW" + }' + +# Keys are: +# Bearer: 01H2D5CAQJF7P80222P4ZAFQ26 +# RPC: 01H2D5CAP1KF2NKRS30SGATDSD + +# Now log in the other user using a referral code +# Retrieve the referral link +curl \ +-H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \ +-X GET "127.0.0.1:8544/user/referral" + +# Referral Code: 01H2D5CMPQKG36WTW1TM21SG28 + + +http://127.0.0.1:8544/user/login/0x762390ae7a3c4D987062a398C1eA8767029AB08E +https://www.myetherwallet.com/wallet/sign +curl -X POST http://127.0.0.1:8544/user/login \ + -H 'Content-Type: application/json' \ + -d '{ + "address": "0x762390ae7a3c4d987062a398c1ea8767029ab08e", + "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078373632333930616537613363344439383730363261333938433165413837363730323941423038450a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a20303148324435435a314150414e5a505950594d5735415945474d0a4973737565642041743a20323032332d30362d30385430393a32383a35392e3137383533385a0a45787069726174696f6e2054696d653a20323032332d30362d30385430393a34383a35392e3137383533385a", + "sig": "af69a53b860bd327c6bb1299439aec70c330aaf9e26cd92c02f4bc2b48b4ebe74fd61b4388ba6ac6a6b83a5e313560d8bbd7877d9eeb416504a3dadca50113621c", + "version": "3", + "signer": "MEW", + "referral_code": "01H2D5CMPQKG36WTW1TM21SG28" + }' + +# Keys are: +# Bearer: 01H2D5DN564M4Q2T6PETEZY83Q +# RPC: 01H2D5DN4D423VR2KFWBZE46TR + +# Check if the referral entry has been made in the database: checked + +# Make a deposit transaction for both parties, and mark them (it does not matter who calls the endpoints) +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/balance/0x749788a5766577431a0a4fc8721fd7cb981f55222e073ed17976f0aba5e8818a" + +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/balance/0xd56dee328dfa3bea26c3762834081881e5eff62e77a2b45e72d98016daaeffba" + +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/balance/0xda41f748106d2d1f1bf395e65d07bd9fc507c1eb4fd50c87d8ca1f34cfd536b0" + +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/balance/0x12b38f3456ccb687ead8386c33071bedc23360931b9be672bb444b7ee1927bbe" +# This throws an error, because the amount that's provided is 10^(18+12). With USDC (18 decimal points), this would be above 1BN dollars. +# I suppose we won't accept shitcoins anytime soone. + +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/balance/0x81022efe36564d737af223e06b9a6c62f29ad7ce2f85dd99f1ea2f2e9a73306e" + + +# Check the balance for both users now +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/balance" + +curl \ +-H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \ +-X GET "127.0.0.1:8544/user/balance" + +# Now we check if the referrals are triggered ... + +# Referred user makes some requests +for i in {1..10000} +do + curl \ + -X POST "127.0.0.1:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR" \ + -H "Content-Type: application/json" \ + --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' +done + +# Let's simultaneously also let the referrer make some requests, to make sure deadlocks do not occur +for i in {1..10000} +do + curl \ + -X POST "127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD" \ + -H "Content-Type: application/json" \ + --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' +done diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 6f3ee818..4ae27296 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -168,19 +168,32 @@ pub async fn user_balance_post( let payment_factory_contract = PaymentFactory::new(payment_factory_address, app.internal_provider().clone()); - // check bloom filter to be sure this transaction contains any relevant logs - if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract - .payment_received_filter() - .filter - .topics[0] - { - let bloom_input = BloomInput::Hash(x.as_fixed_bytes()); + debug!( + "Payment Factor Filter is: {:?}", + payment_factory_contract.payment_received_filter() + ); - // do a quick check that this transaction contains the required log - if !transaction_receipt.logs_bloom.contains_input(bloom_input) { - return Err(Web3ProxyError::BadRequest("no matching logs found".into())); - } - } + // check bloom filter to be sure this transaction contains any relevant logs + // TODO: This does not work properly right now, get back this eventually + // if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract + // .payment_received_filter() + // .filter + // .topics[0] + // { + // debug!("Bloom input bytes is: {:?}", x); + // debug!("Bloom input bytes is: {:?}", x..as_fixed_bytes()); + // debug!("Bloom input as hex is: {:?}", hex!(x)); + // let bloom_input = BloomInput::Raw(hex!(x)); + // debug!( + // "Transaction receipt logs_bloom: {:?}", + // transaction_receipt.logs_bloom + // ); + // + // // do a quick check that this transaction contains the required log + // if !transaction_receipt.logs_bloom.contains_input(x) { + // return Err(Web3ProxyError::BadRequest("no matching logs found".into())); + // } + // } // the transaction might contain multiple relevant logs. collect them all let mut response_data = vec![]; @@ -195,7 +208,6 @@ pub async fn user_balance_post( } // Create a new transaction that will be used for joint transaction - let txn = db_conn.begin().await?; if let Ok(event) = payment_factory_contract .decode_event::( "PaymentReceived", @@ -224,26 +236,29 @@ pub async fn user_balance_post( // hopefully u32 is always enough, because the Decimal crate doesn't accept a larger scale // uses uint8, but i've seen pretty much every int in practice let payment_token_decimals = payment_token.decimals().call().await?.as_u32(); - - let decimal_shift = Decimal::from(10).pow(payment_token_decimals as u64); - let mut payment_token_amount = Decimal::from_str_exact(&payment_token_wei.to_string())?; + // Setting the scale already does the decimal shift, no need to divide a second time payment_token_amount.set_scale(payment_token_decimals)?; - payment_token_amount /= decimal_shift; info!( - "Found deposit transaction for: {:?} {:?} {:?}", - recipient_account, payment_token_address, payment_token_amount + "Found deposit transaction for: {:?} {:?} {:?} {:?}", + &recipient_account.to_fixed_bytes(), + recipient_account, + payment_token_address, + payment_token_amount ); let recipient = match user::Entity::find() - .filter(user::Column::Address.eq(recipient_account.encode_hex())) + .filter(user::Column::Address.eq(recipient_account.to_fixed_bytes().as_slice())) .one(&db_conn) .await? { - Some(x) => x, - None => todo!("make their account"), - }; + Some(x) => Ok(x), + None => { + // todo!("make their account"); + Err(Web3ProxyError::AccessDenied) + } + }?; // For now we only accept stablecoins // And we hardcode the peg (later we would have to depeg this, for example @@ -261,6 +276,7 @@ pub async fn user_balance_post( user_id: sea_orm::Set(recipient.id), ..Default::default() }; + info!("Trying to insert into balance entry: {:?}", balance_entry); balance::Entity::insert(balance_entry) .on_conflict( OnConflict::new() @@ -283,6 +299,7 @@ pub async fn user_balance_post( deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id), ..Default::default() }; + info!("Trying to insert receipt {:?}", receipt); receipt.save(&txn).await?; From e35629dc29e4fa475268ef156812e3164228152e Mon Sep 17 00:00:00 2001 From: yenicelik Date: Thu, 8 Jun 2023 23:57:46 +0200 Subject: [PATCH 2/4] balance is inside the influx query again --- scripts/generate-requests-and-stats.sh | 4 +++- .../101-balance-referral-stats.sh | 6 ++++++ web3_proxy/src/stats/influxdb_queries.rs | 21 ++++++++++++++++--- 3 files changed, 27 insertions(+), 4 deletions(-) rename scripts/{ => manual-tests}/101-balance-referral-stats.sh (96%) diff --git a/scripts/generate-requests-and-stats.sh b/scripts/generate-requests-and-stats.sh index a05e55e7..77ae8f3a 100644 --- a/scripts/generate-requests-and-stats.sh +++ b/scripts/generate-requests-and-stats.sh @@ -5,4 +5,6 @@ # https://github.com/INFURA/versus # ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that -./ethspam http://127.0.0.1:8544/rpc/01H0ZZJDNNEW49FRFS4D9SPR8B | ./versus --concurrency=4 --stop-after 100 http://localhost:8544/rpc/01H0ZZJDNNEW49FRFS4D9SPR8B +./ethspam http://127.0.0.1:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR | ./versus --concurrency=4 --stop-after 10000 http://localhost:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR + +./ethspam http://127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD | ./versus --concurrency=4 --stop-after 10000 http://localhost:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD diff --git a/scripts/101-balance-referral-stats.sh b/scripts/manual-tests/101-balance-referral-stats.sh similarity index 96% rename from scripts/101-balance-referral-stats.sh rename to scripts/manual-tests/101-balance-referral-stats.sh index 942d99fa..40b4fbba 100644 --- a/scripts/101-balance-referral-stats.sh +++ b/scripts/manual-tests/101-balance-referral-stats.sh @@ -128,3 +128,9 @@ curl \ curl \ -H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \ -X GET "127.0.0.1:8544/user/referral/stats/shared-codes" + + +# Finally also get some stats +curl -X GET \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +"http://localhost:8544/user/stats/detailed?query_start=1686236378&query_window_seconds=3600" diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index fe3d966b..b303d40d 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -174,8 +174,8 @@ pub async fn query_user_stats<'a>( |> filter(fn: (r) => r["_measurement"] == "{measurement}") {filter_chain_id} {drop_method} - - base + + cumsum = base |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> drop(columns: ["balance"]) @@ -186,7 +186,22 @@ pub async fn query_user_stats<'a>( |> sort(columns: ["frontend_requests"], desc: true) |> limit(n: 1) |> group() - |> sort(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"], desc: true) + + balance = base + |> toFloat() + |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"]) + |> mean(column: "balance") + |> group() + |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) + + join( + tables: {{cumsum, balance}}, + on: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"] + ) + |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) + "#); debug!("Raw query to db is: {:#?}", query); From ee6b83b31146f029700dc484ba0d539876bb5458 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Fri, 9 Jun 2023 00:03:37 +0200 Subject: [PATCH 3/4] fixed aggregate --- web3_proxy/src/stats/influxdb_queries.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index b303d40d..97dff7be 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -167,6 +167,23 @@ pub async fn query_user_stats<'a>( StatType::Detailed => "".to_string(), }; + let join_candidates = match stat_response_type { + StatType::Aggregated => f!( + r#"{:?}"#, + vec!["_time", "_measurement", "chain_id", "rpc_secret_key_id"] + ), + StatType::Detailed => f!( + r#"{:?}"#, + vec![ + "_time", + "_measurement", + "method", + "chain_id", + "rpc_secret_key_id" + ] + ), + }; + let query = f!(r#" base = from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) @@ -198,10 +215,9 @@ pub async fn query_user_stats<'a>( join( tables: {{cumsum, balance}}, - on: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"] + on: {join_candidates} ) - |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) - + |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) "#); debug!("Raw query to db is: {:#?}", query); From 2333b8bbb653e5b9219bb1d5e8497b1ac0d024fc Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sun, 11 Jun 2023 17:57:02 +0200 Subject: [PATCH 4/4] influx query is not 20-40% faster --- web3_proxy/src/stats/influxdb_queries.rs | 39 ++++++++---------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index dd0dc37b..d388d44c 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -167,57 +167,44 @@ pub async fn query_user_stats<'a>( StatType::Detailed => "".to_string(), }; - let join_candidates = match stat_response_type { - StatType::Aggregated => f!( - r#"{:?}"#, - vec!["_time", "_measurement", "chain_id", "rpc_secret_key_id"] - ), - StatType::Detailed => f!( + let join_candidates = f!( r#"{:?}"#, vec![ "_time", "_measurement", - "method", "chain_id", - "rpc_secret_key_id" + // "rpc_secret_key_id" ] - ), - }; + ); let query = f!(r#" base = from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) - {rpc_key_filter} - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - {filter_chain_id} {drop_method} + {rpc_key_filter} + {filter_chain_id} + |> filter(fn: (r) => r["_measurement"] == "{measurement}") - cumsum = base + cumsum = base + |> filter(fn: (r) => r["_field"] != "balance") + |> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> drop(columns: ["balance"]) - |> group(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) - |> sort(columns: ["frontend_requests"]) - |> map(fn:(r) => ({{ r with "sum_credits_used": float(v: r["sum_credits_used"]) }})) - |> cumulativeSum(columns: ["backend_requests", "cache_hits", "cache_misses", "frontend_requests", "sum_credits_used", "sum_request_bytes", "sum_response_bytes", "sum_response_millis"]) - |> sort(columns: ["frontend_requests"], desc: true) - |> limit(n: 1) |> group() balance = base - |> toFloat() + |> filter(fn: (r) => r["_field"] == "balance") + |> group(columns: ["_field", "_measurement", "chain_id"]) |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"]) - |> mean(column: "balance") |> group() - |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) join( tables: {{cumsum, balance}}, on: {join_candidates} ) - |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) "#); debug!("Raw query to db is: {:#?}", query);