From 081873e937593dac187d443074171d1af2f98a52 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 13 Dec 2022 18:13:23 -0800 Subject: [PATCH] move pending logins to the database --- Cargo.lock | 86 ++++++++----- deferred-rate-limiter/Cargo.toml | 2 +- entities/Cargo.toml | 6 +- entities/src/login.rs | 37 ++++++ entities/src/mod.rs | 5 +- entities/src/pending_login.rs | 23 ++++ entities/src/prelude.rs | 4 +- entities/src/revert_log.rs | 4 +- entities/src/rpc_accounting.rs | 6 +- entities/src/rpc_key.rs | 34 +++-- entities/src/sea_orm_active_enums.rs | 4 +- entities/src/secondary_user.rs | 2 +- entities/src/serialization.rs | 30 +++++ entities/src/user.rs | 24 ++-- entities/src/user_tier.rs | 2 +- migration/Cargo.toml | 4 +- migration/src/lib.rs | 2 + ...0221213_134158_move_login_into_database.rs | 107 ++++++++++++++++ redis-rate-limiter/Cargo.toml | 4 +- web3_proxy/Cargo.toml | 14 +-- web3_proxy/src/app/mod.rs | 3 +- web3_proxy/src/frontend/authorization.rs | 36 +++--- web3_proxy/src/frontend/errors.rs | 15 ++- web3_proxy/src/frontend/mod.rs | 3 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 9 +- web3_proxy/src/frontend/users.rs | 118 +++++++++++------- web3_proxy/src/user_queries.rs | 80 +++++++++--- web3_proxy/src/user_token.rs | 50 +++++++- 28 files changed, 541 insertions(+), 173 deletions(-) create mode 100644 entities/src/login.rs create mode 100644 entities/src/pending_login.rs create mode 100644 entities/src/serialization.rs create mode 100644 migration/src/m20221213_134158_move_login_into_database.rs diff --git a/Cargo.lock b/Cargo.lock index b46e0886..bce47fc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,9 +343,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.17" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" +checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" dependencies = [ "async-trait", "axum-core", @@ -363,8 +363,10 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", + "rustversion", "serde", "serde_json", + "serde_path_to_error", "serde_urlencoded", "sha-1", "sync_wrapper", @@ -378,9 +380,9 @@ dependencies = [ [[package]] name = "axum-client-ip" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92ba6eab8967a7b1121e3cd7491a205dff7a33bb05e65df6c199d687a32e4d3b" +checksum = "a27e888320b2a506263b84ef2f147a68bec582e49b7c6ab8246174aa196e18c8" dependencies = [ "axum", "forwarded-header-value", @@ -388,9 +390,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.9" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" +checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" dependencies = [ "async-trait", "bytes", @@ -398,15 +400,16 @@ dependencies = [ "http", "http-body", "mime", + "rustversion", "tower-layer", "tower-service", ] [[package]] name = "axum-macros" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6293dae2ec708e679da6736e857cf8532886ef258e92930f38279c12641628b8" +checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621" dependencies = [ "heck 0.4.0", "proc-macro2", @@ -1106,9 +1109,9 @@ dependencies = [ [[package]] name = "deadpool-redis" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb2945e05eeebc0f99795a135b2b15c038646d8292946621840d3cbff296e20" +checksum = "2b8bde44cbfdf17ae5baa45c9f43073b320f1a19955389315629304a23909ad2" dependencies = [ "deadpool", "redis", @@ -1347,10 +1350,12 @@ dependencies = [ [[package]] name = "entities" -version = "0.11.0" +version = "0.12.0" dependencies = [ + "ethers", "sea-orm", "serde", + "ulid", "uuid 1.2.2", ] @@ -1502,6 +1507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "002a0d58a7d921b496f5f19b5b9508d01d25fbe25078286b1fcb6f4e7562acf7" dependencies = [ "ethers-contract-abigen", + "ethers-contract-derive", "ethers-core", "ethers-providers", "futures-util", @@ -1537,6 +1543,21 @@ dependencies = [ "walkdir", ] +[[package]] +name = "ethers-contract-derive" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f192e8e4cf2b038318aae01e94e7644e0659a76219e94bcd3203df744341d61f" +dependencies = [ + "ethers-contract-abigen", + "ethers-core", + "hex", + "proc-macro2", + "quote", + "serde_json", + "syn", +] + [[package]] name = "ethers-core" version = "1.0.0" @@ -2461,9 +2482,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.5.1" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f88c5561171189e69df9d98bcf18fd5f9558300f7ea7b801eb8a0fd748bd8745" +checksum = "11b0d96e660696543b251e58030cf9787df56da39dab19ad60eae7353040917e" [[package]] name = "iri-string" @@ -2660,9 +2681,9 @@ dependencies = [ [[package]] name = "matchit" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" [[package]] name = "md-5" @@ -2720,7 +2741,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.11.0" +version = "0.12.0" dependencies = [ "sea-orm-migration", "tokio", @@ -3865,9 +3886,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.6" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f" +checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" [[package]] name = "ryu" @@ -4252,9 +4273,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055" +checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91" dependencies = [ "serde_derive", ] @@ -4271,9 +4292,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4" +checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e" dependencies = [ "proc-macro2", "quote", @@ -4291,6 +4312,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "184c643044780f7ceb59104cef98a5a6f12cb2288a7bc701ab93a362b49fd47d" +dependencies = [ + "serde", +] + [[package]] name = "serde_prometheus" version = "0.1.6" @@ -4953,9 +4983,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.22.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", "bytes", @@ -4969,7 +4999,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "winapi", + "windows-sys 0.42.0", ] [[package]] @@ -5082,9 +5112,9 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" [[package]] name = "tower-service" @@ -5499,7 +5529,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.11.0" +version = "0.12.0" dependencies = [ "anyhow", "arc-swap", diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index 4d80c475..a04611a6 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -11,4 +11,4 @@ anyhow = "1.0.66" hashbrown = "0.13.1" log = "0.4.17" moka = { version = "0.9.6", default-features = false, features = ["future"] } -tokio = "1.22.0" +tokio = "1.23.0" diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 71846a0b..fc7ed575 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.11.0" +version = "0.12.0" edition = "2021" [lib] @@ -11,5 +11,7 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.10.5" -serde = "1.0.149" +serde = "1.0.150" uuid = "1.2.2" +ethers = "1.0.2" +ulid = "1.0.0" diff --git a/entities/src/login.rs b/entities/src/login.rs new file mode 100644 index 00000000..e2600e84 --- /dev/null +++ b/entities/src/login.rs @@ -0,0 +1,37 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 + +use crate::serialization; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "login")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: u64, + #[sea_orm(unique)] + #[serde(serialize_with = "serialization::uuid_as_ulid")] + pub bearer_token: Uuid, + pub user_id: u64, + pub expires_at: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::UserId", + to = "super::user::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + User, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/mod.rs b/entities/src/mod.rs index 8bcaf5a0..5325e16d 100644 --- a/entities/src/mod.rs +++ b/entities/src/mod.rs @@ -1,11 +1,14 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 pub mod prelude; +pub mod login; +pub mod pending_login; pub mod revert_log; pub mod rpc_accounting; pub mod rpc_key; pub mod sea_orm_active_enums; pub mod secondary_user; +pub mod serialization; pub mod user; pub mod user_tier; diff --git a/entities/src/pending_login.rs b/entities/src/pending_login.rs new file mode 100644 index 00000000..2ec44223 --- /dev/null +++ b/entities/src/pending_login.rs @@ -0,0 +1,23 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 + +use crate::serialization; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "pending_login")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: u64, + #[sea_orm(unique)] + #[serde(serialize_with = "serialization::uuid_as_ulid")] + pub nonce: Uuid, + #[sea_orm(column_type = "Text")] + pub message: String, + pub expires_at: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/prelude.rs b/entities/src/prelude.rs index 0f31fa60..218a3d47 100644 --- a/entities/src/prelude.rs +++ b/entities/src/prelude.rs @@ -1,5 +1,7 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 +pub use super::login::Entity as Login; +pub use super::pending_login::Entity as PendingLogin; pub use super::revert_log::Entity as RevertLog; pub use super::rpc_accounting::Entity as RpcAccounting; pub use super::rpc_key::Entity as RpcKey; diff --git a/entities/src/revert_log.rs b/entities/src/revert_log.rs index a2ac9735..9835ef8e 100644 --- a/entities/src/revert_log.rs +++ b/entities/src/revert_log.rs @@ -1,6 +1,7 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 use super::sea_orm_active_enums::Method; +use crate::serialization; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -12,6 +13,7 @@ pub struct Model { pub rpc_key_id: u64, pub timestamp: DateTimeUtc, pub method: Method, + #[serde(serialize_with = "serialization::vec_as_address")] pub to: Vec, #[sea_orm(column_type = "Text", nullable)] pub call_data: Option, diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index 88e3fa5c..e96ba0d8 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -1,4 +1,4 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -11,13 +11,10 @@ pub struct Model { pub rpc_key_id: Option, pub chain_id: u64, pub method: Option, - pub archive_request: bool, pub error_response: bool, pub period_datetime: DateTimeUtc, pub frontend_requests: u64, pub backend_requests: u64, - // pub backend_retries: u64, - // pub no_servers: u64, pub cache_misses: u64, pub cache_hits: u64, pub sum_request_bytes: u64, @@ -41,6 +38,7 @@ pub struct Model { pub p90_response_bytes: u64, pub p99_response_bytes: u64, pub max_response_bytes: u64, + pub archive_request: bool, pub origin: Option, } diff --git a/entities/src/rpc_key.rs b/entities/src/rpc_key.rs index 6038d242..79a9f4bd 100644 --- a/entities/src/rpc_key.rs +++ b/entities/src/rpc_key.rs @@ -1,6 +1,7 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 use super::sea_orm_active_enums::LogLevel; +use crate::serialization; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -11,12 +12,11 @@ pub struct Model { pub id: u64, pub user_id: u64, #[sea_orm(unique)] + #[serde(serialize_with = "serialization::uuid_as_ulid")] pub secret_key: Uuid, pub description: Option, pub private_txs: bool, pub active: bool, - pub log_level: LogLevel, - pub log_revert_chance: f64, #[sea_orm(column_type = "Text", nullable)] pub allowed_ips: Option, #[sea_orm(column_type = "Text", nullable)] @@ -25,16 +25,16 @@ pub struct Model { pub allowed_referers: Option, #[sea_orm(column_type = "Text", nullable)] pub allowed_user_agents: Option, -} - -pub enum RpcKeyLogLevels { - None, - Basic, - Detailed, + pub log_revert_chance: f64, + pub log_level: LogLevel, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm(has_many = "super::revert_log::Entity")] + RevertLog, + #[sea_orm(has_many = "super::rpc_accounting::Entity")] + RpcAccounting, #[sea_orm( belongs_to = "super::user::Entity", from = "Column::UserId", @@ -43,16 +43,6 @@ pub enum Relation { on_delete = "NoAction" )] User, - #[sea_orm(has_many = "super::revert_log::Entity")] - RevertLog, - #[sea_orm(has_many = "super::rpc_accounting::Entity")] - RpcAccounting, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::User.def() - } } impl Related for Entity { @@ -67,4 +57,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/sea_orm_active_enums.rs b/entities/src/sea_orm_active_enums.rs index fc91be63..593e2c17 100644 --- a/entities/src/sea_orm_active_enums.rs +++ b/entities/src/sea_orm_active_enums.rs @@ -1,11 +1,10 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "log_level")] -#[strum(ascii_case_insensitive)] pub enum LogLevel { #[sea_orm(string_value = "none")] None, @@ -31,7 +30,6 @@ pub enum Method { #[sea_orm(string_value = "eth_sendRawTransaction")] EthSendRawTransaction, } - #[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "role")] pub enum Role { diff --git a/entities/src/secondary_user.rs b/entities/src/secondary_user.rs index 275f3223..86c8c0e7 100644 --- a/entities/src/secondary_user.rs +++ b/entities/src/secondary_user.rs @@ -1,4 +1,4 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 use super::sea_orm_active_enums::Role; use sea_orm::entity::prelude::*; diff --git a/entities/src/serialization.rs b/entities/src/serialization.rs new file mode 100644 index 00000000..53182c25 --- /dev/null +++ b/entities/src/serialization.rs @@ -0,0 +1,30 @@ +//! sea-orm types don't always serialize how we want. this helps that, though it won't help every case. +use ethers::prelude::Address; +use sea_orm::prelude::Uuid; +use serde::{Serialize, Serializer}; +use std::convert::TryInto; +use ulid::Ulid; + +pub fn to_fixed_length(v: Vec) -> [T; N] { + v.try_into() + .unwrap_or_else(|v: Vec| panic!("Expected a Vec of length {} but it was {}", N, v.len())) +} + +pub fn vec_as_address(x: &[u8], s: S) -> Result +where + S: Serializer, +{ + let x = Address::from_slice(x); + + x.serialize(s) +} + +pub fn uuid_as_ulid(x: &Uuid, s: S) -> Result +where + S: Serializer, +{ + let x = Ulid::from(x.as_u128()); + + // TODO: to_string shouldn't be needed, but i'm still seeing Uuid length + x.to_string().serialize(s) +} diff --git a/entities/src/user.rs b/entities/src/user.rs index fd121e59..48c89c08 100644 --- a/entities/src/user.rs +++ b/entities/src/user.rs @@ -1,5 +1,6 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 +use crate::serialization; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -9,6 +10,7 @@ pub struct Model { #[sea_orm(primary_key)] pub id: u64, #[sea_orm(unique)] + #[serde(serialize_with = "serialization::vec_as_address")] pub address: Vec, pub description: Option, pub email: Option, @@ -17,6 +19,12 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm(has_many = "super::login::Entity")] + Login, + #[sea_orm(has_many = "super::rpc_key::Entity")] + RpcKey, + #[sea_orm(has_many = "super::secondary_user::Entity")] + SecondaryUser, #[sea_orm( belongs_to = "super::user_tier::Entity", from = "Column::UserTierId", @@ -25,15 +33,11 @@ pub enum Relation { on_delete = "NoAction" )] UserTier, - #[sea_orm(has_many = "super::rpc_key::Entity")] - RpcKey, - #[sea_orm(has_many = "super::secondary_user::Entity")] - SecondaryUser, } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::UserTier.def() + Relation::Login.def() } } @@ -49,4 +53,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::UserTier.def() + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/user_tier.rs b/entities/src/user_tier.rs index c3948b37..5d069e74 100644 --- a/entities/src/user_tier.rs +++ b/entities/src/user_tier.rs @@ -1,4 +1,4 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/migration/Cargo.toml b/migration/Cargo.toml index be9a7417..df2983b2 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.11.0" +version = "0.12.0" edition = "2021" publish = false @@ -9,7 +9,7 @@ name = "migration" path = "src/lib.rs" [dependencies] -tokio = { version = "1.22.0", features = ["full", "tracing"] } +tokio = { version = "1.23.0", features = ["full", "tracing"] } [dependencies.sea-orm-migration] version = "0.10.5" diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 381099b8..0f221af2 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -11,6 +11,7 @@ mod m20221031_211916_clean_up; mod m20221101_222349_archive_request; mod m20221108_200345_save_anon_stats; mod m20221211_124002_request_method_privacy; +mod m20221213_134158_move_login_into_database; pub struct Migrator; @@ -29,6 +30,7 @@ impl MigratorTrait for Migrator { Box::new(m20221101_222349_archive_request::Migration), Box::new(m20221108_200345_save_anon_stats::Migration), Box::new(m20221211_124002_request_method_privacy::Migration), + Box::new(m20221213_134158_move_login_into_database::Migration), ] } } diff --git a/migration/src/m20221213_134158_move_login_into_database.rs b/migration/src/m20221213_134158_move_login_into_database.rs new file mode 100644 index 00000000..28e8f4f7 --- /dev/null +++ b/migration/src/m20221213_134158_move_login_into_database.rs @@ -0,0 +1,107 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(PendingLogin::Table) + .col( + ColumnDef::new(PendingLogin::Id) + .big_unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(PendingLogin::Nonce) + .uuid() + .not_null() + .unique_key(), + ) + .col(ColumnDef::new(PendingLogin::Message).text().not_null()) + .col( + ColumnDef::new(PendingLogin::ExpiresAt) + .timestamp() + .not_null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(Login::Table) + .col( + ColumnDef::new(Login::Id) + .big_unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(Login::BearerToken) + .uuid() + .not_null() + .unique_key(), + ) + .col(ColumnDef::new(Login::UserId).big_unsigned().not_null()) + .col( + ColumnDef::new(PendingLogin::ExpiresAt) + .timestamp() + .not_null(), + ) + .foreign_key( + ForeignKeyCreateStatement::new() + .from_col(Login::UserId) + .to_tbl(User::Table) + .to_col(User::Id), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(PendingLogin::Table).to_owned()) + .await?; + + manager + .drop_table(Table::drop().table(Login::Table).to_owned()) + .await?; + + Ok(()) + } +} + +/// Partial table definition +#[derive(Iden)] +pub enum User { + Table, + Id, +} + +#[derive(Iden)] +enum PendingLogin { + Table, + Id, + Nonce, + Message, + ExpiresAt, +} + +#[derive(Iden)] +enum Login { + Table, + Id, + BearerToken, + UserId, +} diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index b8d35b3b..72494f38 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" [dependencies] anyhow = "1.0.66" -deadpool-redis = { version = "0.11.0", features = ["rt_tokio_1", "serde"] } -tokio = "1.22.0" +deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] } +tokio = "1.23.0" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a654e2ea..423c9098 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.11.0" +version = "0.12.0" edition = "2021" default-run = "web3_proxy" @@ -22,9 +22,9 @@ thread-fast-rng = { path = "../thread-fast-rng" } anyhow = { version = "1.0.66", features = ["backtrace"] } arc-swap = "1.5.1" argh = "0.1.9" -axum = { version = "0.5.17", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } -axum-client-ip = "0.2.0" -axum-macros = "0.2.3" +axum = { version = "0.6.1", features = ["headers", "ws"] } +axum-client-ip = "0.3.0" +axum-macros = "0.3.0" # TODO: import chrono from sea-orm so we always have the same version chrono = "0.4.23" counter = "0.5.7" @@ -38,7 +38,7 @@ futures = { version = "0.3.25", features = ["thread-pool"] } hashbrown = { version = "0.13.1", features = ["serde"] } hdrhistogram = "7.5.2" http = "0.2.8" -ipnet = "2.5.1" +ipnet = "2.7.0" log = "0.4.17" metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.6", default-features = false, features = ["future"] } @@ -55,12 +55,12 @@ handlebars = "4.3.5" rustc-hash = "1.1.0" siwe = "0.5.0" sentry = { version = "0.29.1", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } -serde = { version = "1.0.149", features = [] } +serde = { version = "1.0.150", features = [] } serde_json = { version = "1.0.89", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude time = "0.3.17" -tokio = { version = "1.22.0", features = ["full"] } +tokio = { version = "1.23.0", features = ["full"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude tokio-stream = { version = "0.1.11", features = ["sync"] } toml = "0.5.9" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 01498076..846c9961 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -13,6 +13,7 @@ use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; +use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; use deferred_rate_limiter::DeferredRateLimiter; @@ -123,7 +124,7 @@ pub struct Web3ProxyApp { Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub bearer_token_semaphores: - Cache, hashbrown::hash_map::DefaultHashBuilder>, + Cache, hashbrown::hash_map::DefaultHashBuilder>, pub stat_sender: Option>, } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 38dc51f9..24962a64 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -8,13 +8,12 @@ use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; -use entities::{rpc_key, user, user_tier}; +use entities::{login, rpc_key, user, user_tier}; use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; use log::error; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; -use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use std::fmt::Display; use std::sync::atomic::{AtomicBool, AtomicU64}; @@ -422,11 +421,14 @@ impl Web3ProxyApp { pub async fn bearer_is_authorized( &self, bearer: Bearer, - ) -> anyhow::Result<(user::Model, OwnedSemaphorePermit)> { + ) -> Result<(user::Model, OwnedSemaphorePermit), FrontendErrorResponse> { + // get the user id for this bearer token + let user_bearer_token = UserBearerToken::try_from(bearer)?; + // limit concurrent requests let semaphore = self .bearer_token_semaphores - .get_with(bearer.token().to_string(), async move { + .get_with(user_bearer_token.clone(), async move { let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize); Arc::new(s) }) @@ -434,26 +436,20 @@ impl Web3ProxyApp { let semaphore_permit = semaphore.acquire_owned().await?; - // get the user id for this bearer token - // TODO: move redis key building to a helper function - let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); + // get the attached address from the database for the given auth_token. + let db_conn = self + .db_conn() + .context("checking if bearer token is authorized")?; - // get the attached address from redis for the given auth_token. - let mut redis_conn = self.redis_conn().await?; + let user_bearer_uuid: Uuid = user_bearer_token.into(); - let user_id: u64 = redis_conn - .get::<_, Option>(bearer_cache_key) - .await - .context("fetching bearer cache key from redis")? - .context("unknown bearer token")?; - - // turn user id into a user - let db_conn = self.db_conn().context("Getting database connection")?; - let user = user::Entity::find_by_id(user_id) + let user = user::Entity::find() + .left_join(login::Entity) + .filter(login::Column::BearerToken.eq(user_bearer_uuid)) .one(&db_conn) .await - .context("fetching user from db by id")? - .context("unknown user id")?; + .context("fetching user from db by bearer token")? + .context("unknown bearer token")?; Ok((user, semaphore_permit)) } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index ff3a1d1f..fdbbbe04 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -16,7 +16,7 @@ use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use std::error::Error; -use tokio::{task::JoinError, time::Instant}; +use tokio::{sync::AcquireError, task::JoinError, time::Instant}; // TODO: take "IntoResponse" instead of Response? pub type FrontendResult = Result; @@ -26,6 +26,7 @@ pub type FrontendResult = Result; pub enum FrontendErrorResponse { AccessDenied, Anyhow(anyhow::Error), + SemaphoreAcquireError(AcquireError), Box(Box), Database(DbErr), HeadersError(headers::Error), @@ -199,6 +200,18 @@ impl IntoResponse for FrontendErrorResponse { debug_assert_ne!(r.status(), StatusCode::OK); return r; } + Self::SemaphoreAcquireError(err) => { + warn!("semaphore acquire err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_string( + // TODO: is it safe to expose all of our anyhow strings? + "semaphore acquire error".to_string(), + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::StatusCode(status_code, err_msg, err) => { // different status codes should get different error levels. 500s should warn. 400s should stat let code = status_code.as_u16(); diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index f77b30c0..e4afd657 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -10,7 +10,6 @@ pub mod users; use crate::app::Web3ProxyApp; use axum::{ - handler::Handler, routing::{get, post, put}, Extension, Router, }; @@ -93,7 +92,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // frontend caches .layer(Extension(response_cache)) // 404 for any unknown routes - .fallback(errors::handler_404.into_service()); + .fallback(errors::handler_404); // run our app with hyper // TODO: allow only listening on localhost? top_config.app.host.parse()? diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index b6722332..fc03c712 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -15,7 +15,6 @@ use std::sync::Arc; /// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. /// If possible, please use a WebSocket instead. #[debug_handler] - pub async fn proxy_web3_rpc( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -42,15 +41,14 @@ pub async fn proxy_web3_rpc( /// Can optionally authorized based on origin, referer, or user agent. /// If possible, please use a WebSocket instead. #[debug_handler] - pub async fn proxy_web3_rpc_with_key( Extension(app): Extension>, ClientIp(ip): ClientIp, - Json(payload): Json, origin: Option>, referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Json(payload): Json, ) -> FrontendResult { let rpc_key = rpc_key.parse()?; @@ -69,10 +67,7 @@ pub async fn proxy_web3_rpc_with_key( // the request can take a while, so we spawn so that we can start serving another request // TODO: spawn even earlier? - let f = tokio::spawn(async move { - app.proxy_web3_rpc(authorization, payload) - .await - }); + let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await }); // if this is an error, we are likely shutting down let response = f.await??; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 0d1fdec5..601bf3fa 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -18,19 +18,20 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; +use chrono::{TimeZone, Utc}; use entities::sea_orm_active_enums::LogLevel; -use entities::{revert_log, rpc_key, user}; +use entities::{login, pending_login, revert_log, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; use log::warn; +use migration::sea_orm::prelude::Uuid; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, QueryOrder, TransactionTrait, TryIntoModel, }; -use redis_rate_limiter::redis::AsyncCommands; use serde::Deserialize; use serde_json::json; use siwe::{Message, VerificationOpts}; @@ -84,7 +85,7 @@ pub async fn user_login_get( .context("impossible")? .parse() // TODO: map_err so this becomes a 401 - .context("bad input")?; + .context("unable to parse address")?; let login_domain = app .config @@ -95,6 +96,7 @@ pub async fn user_login_get( // TODO: get most of these from the app config let message = Message { // TODO: don't unwrap + // TODO: accept a login_domain from the request? domain: login_domain.parse().unwrap(), address: user_address.to_fixed_bytes(), // TODO: config for statement @@ -111,16 +113,28 @@ pub async fn user_login_get( resources: vec![], }; - // TODO: if no redis server, store in local cache? at least give a better error. right now this seems to be a 502 - // the address isn't enough. we need to save the actual message so we can read the nonce - // TODO: what message format is the most efficient to store in redis? probably eip191_bytes - // we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time - // TODO: store a maximum number of attempted logins? anyone can request so we don't want to allow DOS attacks - let session_key = format!("login_nonce:{}", nonce); - app.redis_conn() - .await? - .set_ex(session_key, message.to_string(), expire_seconds + 1) - .await?; + let db_conn = app.db_conn().context("login requires a database")?; + + // massage types to fit in the database. sea-orm does not make this very elegant + let uuid = Uuid::from_u128(nonce.into()); + // we add 1 to expire_seconds just to be sure the database has the key for the full expiration_time + let expires_at = Utc + .timestamp_opt(expiration_time.unix_timestamp() + 1, 0) + .unwrap(); + + // we do not store a maximum number of attempted logins. anyone can request so we don't want to allow DOS attacks + // add a row to the database for this user + let user_pending_login = pending_login::ActiveModel { + id: sea_orm::NotSet, + nonce: sea_orm::Set(uuid), + message: sea_orm::Set(message.to_string()), + expires_at: sea_orm::Set(expires_at), + }; + + user_pending_login + .save(&db_conn) + .await + .context("saving user's pending_login")?; // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? @@ -163,12 +177,11 @@ pub struct PostLogin { /// It is recommended to save the returned bearer token in a cookie. /// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile. #[debug_handler] - pub async fn user_login_post( Extension(app): Extension>, ClientIp(ip): ClientIp, - Json(payload): Json, Query(query): Query, + Json(payload): Json, ) -> FrontendResult { login_is_authorized(&app, ip).await?; @@ -200,16 +213,25 @@ pub async fn user_login_post( // the only part of the message we will trust is their nonce // TODO: this is fragile. have a helper function/struct for redis keys - let login_nonce_key = format!("login_nonce:{}", &their_msg.nonce); + let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; - // fetch the message we gave them from our redis - let mut redis_conn = app.redis_conn().await?; + // fetch the message we gave them from our database + let db_conn = app.db_conn().context("Getting database connection")?; - let our_msg: Option = redis_conn.get(&login_nonce_key).await?; + // massage type for the db + let login_nonce_uuid: Uuid = login_nonce.clone().into(); - let our_msg: String = our_msg.context("login nonce not found")?; + let user_pending_login = pending_login::Entity::find() + .filter(pending_login::Column::Nonce.eq(login_nonce_uuid)) + .one(&db_conn) + .await + .context("database error while finding pending_login")? + .context("login nonce not found")?; - let our_msg: siwe::Message = our_msg.parse().context("parsing siwe message")?; + let our_msg: siwe::Message = user_pending_login + .message + .parse() + .context("parsing siwe message")?; // default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts::default(); @@ -234,8 +256,6 @@ pub async fn user_login_post( } } - let db_conn = app.db_conn().context("Getting database connection")?; - // TODO: limit columns or load whole user? let u = user::Entity::find() .filter(user::Column::Address.eq(our_msg.address.as_ref())) @@ -305,7 +325,7 @@ pub async fn user_login_post( }; // create a bearer token for the user. - let bearer_token = Ulid::new(); + let user_bearer_token = UserBearerToken::default(); // json response with everything in it // we could return just the bearer token, but I think they will always request api keys and the user profile @@ -314,27 +334,37 @@ pub async fn user_login_post( .into_iter() .map(|uk| (uk.id, uk)) .collect::>(), - "bearer_token": bearer_token, + "bearer_token": user_bearer_token, "user": u, }); let response = (status_code, Json(response_json)).into_response(); - // add bearer to redis - // TODO: use a helper function/struct for this - let bearer_redis_key = UserBearerToken(bearer_token).to_string(); + // add bearer to the database // expire in 4 weeks - // TODO: get expiration time from app config - redis_conn - .set_ex(bearer_redis_key, u.id.to_string(), 2_419_200) - .await?; + let expires_at = Utc::now() + .checked_add_signed(chrono::Duration::weeks(4)) + .unwrap(); - if let Err(err) = redis_conn.del::<_, u64>(&login_nonce_key).await { - warn!( - "Failed to delete login_nonce_key {}: {}", - login_nonce_key, err - ); + let user_login = login::ActiveModel { + id: sea_orm::NotSet, + bearer_token: sea_orm::Set(user_bearer_token.uuid()), + user_id: sea_orm::Set(u.id), + expires_at: sea_orm::Set(expires_at), + }; + + user_login + .save(&db_conn) + .await + .context("saving user login")?; + + if let Err(err) = user_pending_login + .into_active_model() + .delete(&db_conn) + .await + { + warn!("Failed to delete nonce:{}: {}", login_nonce.0, err); } Ok(response) @@ -342,17 +372,21 @@ pub async fn user_login_post( /// `POST /user/logout` - Forget the bearer token in the `Authentication` header. #[debug_handler] - pub async fn user_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { - let mut redis_conn = app.redis_conn().await?; + let user_bearer = UserBearerToken::try_from(bearer)?; - // TODO: i don't like this. move this to a helper function so it is less fragile - let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); + let db_conn = app.db_conn().context("database needed for user logout")?; - redis_conn.del(bearer_cache_key).await?; + if let Err(err) = login::Entity::delete_many() + .filter(login::Column::BearerToken.eq(user_bearer.uuid())) + .exec(&db_conn) + .await + { + warn!("Failed to delete {}: {}", user_bearer.redis_key(), err); + } // TODO: what should the response be? probably json something Ok("goodbye".into_response()) diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index d6edf197..8a329d7a 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -5,20 +5,24 @@ use axum::{ headers::{authorization::Bearer, Authorization}, TypedHeader, }; -use chrono::NaiveDateTime; -use entities::{rpc_accounting, rpc_key}; +use chrono::{NaiveDateTime, Utc}; +use entities::{login, rpc_accounting, rpc_key}; use hashbrown::HashMap; use http::StatusCode; +use log::{debug, warn}; use migration::sea_orm::{ - ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select, + ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, + QuerySelect, Select, }; use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; -/// get the attached address from redis for the given auth_token. +/// get the attached address for the given bearer token. +/// First checks redis. Then checks the database. /// 0 means all users pub async fn get_user_id_from_params( mut redis_conn: RedisConnection, + db_conn: DatabaseConnection, // this is a long type. should we strip it down? bearer: Option>>, params: &HashMap, @@ -26,22 +30,70 @@ pub async fn get_user_id_from_params( match (bearer, params.get("user_id")) { (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { // check for the bearer cache key - let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); + let user_bearer_token = UserBearerToken::try_from(bearer)?; + + let user_redis_key = user_bearer_token.redis_key(); + + let mut save_to_redis = false; // get the user id that is attached to this bearer token - let bearer_user_id = redis_conn - .get::<_, u64>(bearer_cache_key) - .await - // TODO: this should be a 403 - .context("fetching rpc_key_id from redis with bearer_cache_key")?; + let bearer_user_id = match redis_conn.get::<_, u64>(&user_redis_key).await { + Err(_) => { + // TODO: inspect the redis error? if redis is down we should warn + + let user_login = login::Entity::find() + .filter(login::Column::BearerToken.eq(user_bearer_token.uuid())) + .one(&db_conn) + .await + .context("database error while querying for user")? + .ok_or(FrontendErrorResponse::AccessDenied)?; + + // check expiration. if expired, delete ALL expired pending_logins + let now = Utc::now(); + + if now > user_login.expires_at { + // this row is expired! do not allow auth! + + let delete_result = login::Entity::delete_many() + .filter(login::Column::ExpiresAt.lte(now)) + .exec(&db_conn) + .await?; + + debug!("cleared expired pending_logins: {:?}", delete_result); + + return Err(FrontendErrorResponse::AccessDenied); + } + + save_to_redis = true; + + user_login.user_id + } + Ok(x) => { + // TODO: save it to redis again to extend the key? + + x + } + }; let user_id: u64 = user_id.parse().context("Parsing user_id param")?; if bearer_user_id != user_id { - Err(FrontendErrorResponse::AccessDenied) - } else { - Ok(bearer_user_id) + return Err(FrontendErrorResponse::AccessDenied); } + + if save_to_redis { + // TODO: how long? we store in database for 4 weeks + let one_day = 60 * 60 * 24; + + if let Err(err) = redis_conn + .set_ex::<_, _, ()>(user_redis_key, user_id, one_day) + .await + { + warn!("Unable to save user bearer token to redis: {}", err) + } + } + + Ok(bearer_user_id) } (_, None) => { // they have a bearer token. we don't care about it on public pages @@ -280,7 +332,7 @@ pub async fn query_user_stats<'a>( // get_user_id_from_params checks that the bearer is connected to this user_id // TODO: match on user_id and rpc_key_id? - let user_id = get_user_id_from_params(redis_conn, bearer, params).await?; + let user_id = get_user_id_from_params(redis_conn, db_conn.clone(), bearer, params).await?; let (condition, q) = if user_id == 0 { // 0 means everyone. don't filter on user // TODO: 0 or None? diff --git a/web3_proxy/src/user_token.rs b/web3_proxy/src/user_token.rs index 9a323757..6c017eb0 100644 --- a/web3_proxy/src/user_token.rs +++ b/web3_proxy/src/user_token.rs @@ -1,9 +1,53 @@ +use std::str::FromStr; + use axum::headers::authorization::Bearer; +use migration::sea_orm::prelude::Uuid; +use serde::Serialize; use ulid::Ulid; /// Key used for caching the user's login +#[derive(Clone, Hash, PartialEq, Eq, Serialize)] +#[serde(transparent)] pub struct UserBearerToken(pub Ulid); +impl UserBearerToken { + pub fn redis_key(&self) -> String { + format!("bearer:{}", self.0) + } + + pub fn uuid(&self) -> Uuid { + Uuid::from_u128(self.0.into()) + } +} + +impl Default for UserBearerToken { + fn default() -> Self { + Self(Ulid::new()) + } +} + +impl FromStr for UserBearerToken { + type Err = ulid::DecodeError; + + fn from_str(s: &str) -> Result { + let ulid = Ulid::from_str(s)?; + + Ok(Self(ulid)) + } +} + +impl From for UserBearerToken { + fn from(x: Ulid) -> Self { + Self(x) + } +} + +impl From for Uuid { + fn from(x: UserBearerToken) -> Self { + x.uuid() + } +} + impl TryFrom for UserBearerToken { type Error = ulid::DecodeError; @@ -13,9 +57,3 @@ impl TryFrom for UserBearerToken { Ok(UserBearerToken(u)) } } - -impl ToString for UserBearerToken { - fn to_string(&self) -> String { - format!("bearer:{}", self.0) - } -}