From 1d22291737681b2372f6eb03f6558669ffd45e08 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 28 Oct 2022 06:38:21 +0000 Subject: [PATCH] wip. add user tiers --- Cargo.lock | 2 +- TODO.md | 17 +- migration/Cargo.toml | 2 +- migration/src/lib.rs | 2 + migration/src/m20221027_002407_user_tiers.rs | 172 +++++++++++++++++++ web3_proxy/src/user_token.rs | 21 +++ 6 files changed, 206 insertions(+), 10 deletions(-) create mode 100644 migration/src/m20221027_002407_user_tiers.rs create mode 100644 web3_proxy/src/user_token.rs diff --git a/Cargo.lock b/Cargo.lock index d018a993..8397e313 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,7 +2657,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.6.0" +version = "0.7.0" dependencies = [ "sea-orm-migration", "tokio", diff --git a/TODO.md b/TODO.md index 103372fa..d536e109 100644 --- a/TODO.md +++ b/TODO.md @@ -210,19 +210,20 @@ These are roughly in order of completition - [x] generate a new key from a web endpoint - [x] modifying key settings such as private relay, revert logging, ip/origin/etc checks - [x] GET logged reverts on an endpoint that **requires authentication**. -- [ ] rename user_key to rpc_key +- [x] endpoint to list keys without having to sign a message to log in again +- [x] rename user_key to rpc_key - [x] in code - - [ ] in database with a migration -- [ ] document url params with examples -- [ ] instead of requests_per_minute on every key, have a "user_tier" that gets joined + - [x] in database with a migration +- [x] instead of requests_per_minute on every key, have a "user_tier" that gets joined +- [ ] document url params with a test that works for examples - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests - [-] add configurable size limits to all the Caches - [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in or spawned since it will slow things down and will make their calls less private + - [ ] automatic pruning of old revert logs once too many are collected - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it -- [ ] endpoint to list keys without having to sign a message to log in again - [ ] make the "not synced" error more verbose - I think there is a bug in our synced_rpcs filtering. likely in has_block_data - seeing "not synced" when I load https://vfat.tools/esd/ @@ -235,16 +236,16 @@ These are roughly in order of completition - [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized - [ ] user create script should allow multiple keys per user - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens -- [ ] Ulid instead of Uuid for database ids +- [ ] Uuid/Ulid instead of big_unsigned for database ids - might have to use Uuid in sea-orm and then convert to Ulid on display -- [ ] add pruning or aggregating or something to log revert trace. otherwise our databases are going to grow really big - - [ ] after adding this, allow posting to /user/keys to turn on revert logging + - https://www.kostolansky.sk/posts/how-to-migrate-to-uuid/ - [ ] display concurrent requests per api key (only with authentication!) ## V1 These are not yet ordered. +- [ ] change "remember me" to last until 4 weeks of no use, rather than 4 weeks since login - [ ] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", fake a success message - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - [ ] BUG? WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 3cc6bdd1..8422bb72 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.6.0" +version = "0.7.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 6a3aab63..6e4397f6 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -6,6 +6,7 @@ mod m20220928_015108_concurrency_limits; mod m20221007_213828_accounting; mod m20221025_210326_add_chain_id_to_reverts; mod m20221026_230819_rename_user_keys; +mod m20221027_002407_user_tiers; pub struct Migrator; @@ -19,6 +20,7 @@ impl MigratorTrait for Migrator { Box::new(m20221007_213828_accounting::Migration), Box::new(m20221025_210326_add_chain_id_to_reverts::Migration), Box::new(m20221026_230819_rename_user_keys::Migration), + Box::new(m20221027_002407_user_tiers::Migration), ] } } diff --git a/migration/src/m20221027_002407_user_tiers.rs b/migration/src/m20221027_002407_user_tiers.rs new file mode 100644 index 00000000..2d869710 --- /dev/null +++ b/migration/src/m20221027_002407_user_tiers.rs @@ -0,0 +1,172 @@ +use sea_orm_migration::prelude::*; +use sea_orm_migration::sea_orm::ConnectionTrait; +use sea_orm_migration::sea_query::table::ColumnDef; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // tracking request limits per key is going to get annoying. + // so now, we make a "user_tier" table that tracks different tiers of users. + manager + .create_table( + Table::create() + .table(UserTier::Table) + .col( + ColumnDef::new(UserTier::Id) + .big_unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(UserTier::Title).string().not_null()) + .col(ColumnDef::new(UserTier::RequestsPerMinute).big_unsigned()) + .col(ColumnDef::new(UserTier::MaxConcurrentRequests).unsigned()) + .to_owned(), + ) + .await?; + + // seed data + let user_tiers = Query::insert() + .into_table(UserTier::Table) + .columns([ + UserTier::Title, + UserTier::RequestsPerMinute, + UserTier::MaxConcurrentRequests, + ]) + // // anon users get very low limits. these belong in config though, not the database + // .values_panic(["Anonymous".into(), Some("120").into(), Some("1").into()]) + // free users get higher but still low limits + .values_panic(["Free".into(), Some("360").into(), Some("5").into()]) + // private demos get unlimited request/second + .values_panic([ + "Private Demo".into(), + None::<&str>.into(), + Some("2000").into(), + ]) + // we will definitely have more tiers between "free" and "effectively unlimited" + // incredibly high limits + .values_panic([ + "Effectively Unlimited".into(), + Some("6000000").into(), + Some("10000").into(), + ]) + // no limits + .values_panic(["Unlimited".into(), None::<&str>.into(), None::<&str>.into()]) + .to_owned(); + + manager.exec_stmt(user_tiers).await?; + + let db_conn = manager.get_connection(); + let db_backend = manager.get_database_backend(); + + let select_private_demo_id = Query::select() + .column(UserTier::Id) + .column(UserTier::Title) + .from(UserTier::Table) + .and_having(Expr::col(UserTier::Title).eq("Private Demo")) + .to_owned(); + let private_demo_id: u64 = db_conn + .query_one(db_backend.build(&select_private_demo_id)) + .await? + .expect("we just created Private Demo") + .try_get("", &UserTier::Id.to_string())?; + + // add a foreign key between tiers and users. default to "Private Demo" + manager + .alter_table( + Table::alter() + .table(User::Table) + .add_column( + ColumnDef::new(User::UserTierId) + .big_unsigned() + .default(private_demo_id) + .not_null(), + ) + .add_foreign_key( + TableForeignKey::new() + .from_col(User::UserTierId) + .to_tbl(UserTier::Table) + .to_col(UserTier::Id), + ) + .to_owned(), + ) + .await?; + + // change default to free tier + let select_free_id = Query::select() + .column(UserTier::Id) + .column(UserTier::Title) + .from(UserTier::Table) + .and_having(Expr::col(UserTier::Title).eq("Free")) + .to_owned(); + let free_id: u64 = db_conn + .query_one(db_backend.build(&select_free_id)) + .await? + .expect("we just created Free") + .try_get("", &UserTier::Id.to_string())?; + + manager + .alter_table( + Table::alter() + .table(User::Table) + .modify_column( + ColumnDef::new(User::UserTierId) + .big_unsigned() + .default(free_id) + .not_null(), + ) + .to_owned(), + ) + .await?; + + // delete requests per minute and max concurrent requests now that we have user tiers + manager + .alter_table( + Table::alter() + .table(RpcKeys::Table) + .drop_column(RpcKeys::RequestsPerMinute) + .drop_column(RpcKeys::MaxConcurrentRequests) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // TODO: drop the index first + + manager + .drop_table(Table::drop().table(UserTier::Table).to_owned()) + .await + + // TODO: undo more + } +} + +/// partial table +#[derive(Iden)] +enum User { + Table, + UserTierId, +} + +#[derive(Iden)] +enum UserTier { + Table, + Id, + Title, + RequestsPerMinute, + MaxConcurrentRequests, +} + +/// partial table +#[derive(Iden)] +enum RpcKeys { + Table, + RequestsPerMinute, + MaxConcurrentRequests, +} diff --git a/web3_proxy/src/user_token.rs b/web3_proxy/src/user_token.rs new file mode 100644 index 00000000..9a323757 --- /dev/null +++ b/web3_proxy/src/user_token.rs @@ -0,0 +1,21 @@ +use axum::headers::authorization::Bearer; +use ulid::Ulid; + +/// Key used for caching the user's login +pub struct UserBearerToken(pub Ulid); + +impl TryFrom for UserBearerToken { + type Error = ulid::DecodeError; + + fn try_from(b: Bearer) -> Result { + let u = Ulid::from_string(b.token())?; + + Ok(UserBearerToken(u)) + } +} + +impl ToString for UserBearerToken { + fn to_string(&self) -> String { + format!("bearer:{}", self.0) + } +}