From 823017236635c8cd159603e01bfbca61588e83c3 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 25 Oct 2022 21:10:05 +0000 Subject: [PATCH] more user endpoints --- Cargo.lock | 6 +- bugs.md | 36 +++ entities/Cargo.toml | 2 +- entities/src/revert_logs.rs | 1 + migration/Cargo.toml | 2 +- migration/src/lib.rs | 2 + .../m20220928_015108_concurrency_limits.rs | 1 - ...20221025_210326_add_chain_id_to_reverts.rs | 67 +++++ web3_proxy/Cargo.toml | 2 +- .../src/bin/web3_proxy_cli/create_user.rs | 18 +- web3_proxy/src/frontend/authorization.rs | 95 +++++-- web3_proxy/src/frontend/mod.rs | 6 +- web3_proxy/src/frontend/users.rs | 251 ++++++++---------- web3_proxy/src/user_queries.rs | 158 +---------- 14 files changed, 319 insertions(+), 328 deletions(-) create mode 100644 bugs.md create mode 100644 migration/src/m20221025_210326_add_chain_id_to_reverts.rs diff --git a/Cargo.lock b/Cargo.lock index a945996e..a11ffeeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1329,7 +1329,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.4.0" +version = "0.5.0" dependencies = [ "sea-orm", "serde", @@ -2632,7 +2632,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.4.0" +version = "0.5.0" dependencies = [ "sea-orm-migration", "tokio", @@ -5466,7 +5466,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.4.0" +version = "0.5.0" dependencies = [ "anyhow", "arc-swap", diff --git a/bugs.md b/bugs.md new file mode 100644 index 00000000..9a7f0067 --- /dev/null +++ b/bugs.md @@ -0,0 +1,36 @@ +# deadlock + +goerli_1 | 1 deadlocks detected +goerli_1 | Deadlock #0 +goerli_1 | Thread Id 139881298757376 +goerli_1 | 0: 0x5608f7f762d9 - backtrace::backtrace::trace::hbe74611947a262af +goerli_1 | 1: 0x5608f7f7a967 - backtrace::capture::Backtrace::new::h667fe9ee7ec04c33 +goerli_1 | 2: 0x5608f7f6ed33 - parking_lot_core::parking_lot::deadlock_impl::on_unpark::h78879313dd6461e5 +goerli_1 | 3: 0x5608f730dcd4 - parking_lot::raw_mutex::RawMutex::lock_slow::h9c58bf1ec322b8f6 +goerli_1 | 4: 0x5608f78f2e87 - as core::ops::drop::Drop>::drop::h4887dbe8ef7d7472 +goerli_1 | 5: 0x5608f7909362 - alloc::sync::Arc::drop_slow::h3de3d854b76812ea +goerli_1 | 6: 0x5608f7919596 - core::ptr::drop_in_place,ahash::random_state::RandomState>>::h1bf4d8ebf87406ed +goerli_1 | 7: 0x5608f791ac00 - triomphe::arc::Arc::drop_slow::h246e78aee1f2a265 +goerli_1 | 8: 0x5608f78e38bd - crossbeam_epoch::deferred::Deferred::new::call::h395b93588d5e21a9 +goerli_1 | 9: 0x5608f72fbaa2 - crossbeam_epoch::internal::Global::collect::h77479fc8b8898340 +goerli_1 | 10: 0x5608f73ef22c - as moka::common::concurrent::housekeeper::InnerSync>::sync::h07f3f4f6db1c2598 +goerli_1 | 11: 0x5608f75e4ee3 - moka::common::concurrent::housekeeper::ThreadPoolHousekeeper::call_sync::h11b70044870c94f4 +goerli_1 | 12: 0x5608f75e4b03 - moka::common::concurrent::housekeeper::ThreadPoolHousekeeper::start_periodical_sync_job::{{closure}}::hdc1c253b1b156548 +goerli_1 | 13: 0x5608f7cc8d15 - scheduled_thread_pool::Worker::run_job::hb3ae60b61103071b +goerli_1 | 14: 0x5608f7cc8b8b - scheduled_thread_pool::Worker::run::h760e10fe3281c379 +goerli_1 | 15: 0x5608f7ccb294 - std::sys_common::backtrace::__rust_begin_short_backtrace::hc3b55a28c2ef3a5f +goerli_1 | 16: 0x5608f7cc9cb5 - core::ops::function::FnOnce::call_once{{vtable.shim}}::hf330c4157d74cf0e +goerli_1 | 17: 0x5608f7fc8dd3 - as core::ops::function::FnOnce>::call_once::h56d5fc072706762b +goerli_1 | at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/alloc/src/boxed.rs:1935:9 +goerli_1 | as core::ops::function::FnOnce>::call_once::h41deef8e33b824bb +goerli_1 | at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/alloc/src/boxed.rs:1935:9 +goerli_1 | std::sys::unix::thread::Thread::new::thread_start::ha6436304a1170bba +goerli_1 | at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/std/src/sys/unix/thread.rs:108:17 +goerli_1 | 18: 0x7f3b309e9ea7 - start_thread +goerli_1 | 19: 0x7f3b307bfaef - clone +goerli_1 | 20: 0x0 - +goerli_1 | + +also saw deadlocks on other chains (arbitrum, goerli, gnosis, optimism, polygon, fantom). though luckily not on eth. and it seems like it kept going. +i'm going to guess that the problem is nested caches. +refactor to maybe use a dashmap at one level? or flatten into one level and use channels more diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 14402b76..b35a7976 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.4.0" +version = "0.5.0" edition = "2021" [lib] diff --git a/entities/src/revert_logs.rs b/entities/src/revert_logs.rs index 5483dff4..2cc2454e 100644 --- a/entities/src/revert_logs.rs +++ b/entities/src/revert_logs.rs @@ -15,6 +15,7 @@ pub struct Model { pub to: Vec, #[sea_orm(column_type = "Text", nullable)] pub call_data: Option, + pub chain_id: u64, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 185448aa..8c89f6bb 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.4.0" +version = "0.5.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 380bbe30..8a1b535b 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -4,6 +4,7 @@ mod m20220101_000001_create_table; mod m20220921_181610_log_reverts; mod m20220928_015108_concurrency_limits; mod m20221007_213828_accounting; +mod m20221025_210326_add_chain_id_to_reverts; pub struct Migrator; @@ -15,6 +16,7 @@ impl MigratorTrait for Migrator { Box::new(m20220921_181610_log_reverts::Migration), Box::new(m20220928_015108_concurrency_limits::Migration), Box::new(m20221007_213828_accounting::Migration), + Box::new(m20221025_210326_add_chain_id_to_reverts::Migration), ] } } diff --git a/migration/src/m20220928_015108_concurrency_limits.rs b/migration/src/m20220928_015108_concurrency_limits.rs index 76a50a49..9d0161e1 100644 --- a/migration/src/m20220928_015108_concurrency_limits.rs +++ b/migration/src/m20220928_015108_concurrency_limits.rs @@ -30,7 +30,6 @@ impl MigrationTrait for Migration { .alter_table( sea_query::Table::alter() .table(UserKeys::Table) - .to_owned() .drop_column(UserKeys::MaxConcurrentRequests) .to_owned(), ) diff --git a/migration/src/m20221025_210326_add_chain_id_to_reverts.rs b/migration/src/m20221025_210326_add_chain_id_to_reverts.rs new file mode 100644 index 00000000..9bc94ef3 --- /dev/null +++ b/migration/src/m20221025_210326_add_chain_id_to_reverts.rs @@ -0,0 +1,67 @@ +use sea_orm_migration::prelude::table::ColumnDef; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // add a field to the UserKeys table + manager + .alter_table( + sea_query::Table::alter() + .table(RevertLogs::Table) + // add column for a better version of rate limiting + .add_column( + ColumnDef::new(RevertLogs::ChainId) + .big_unsigned() + .not_null() + // create it with a default of 1 + .default(1), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + sea_query::Table::alter() + .table(RevertLogs::Table) + // remove the default + .modify_column( + ColumnDef::new(RevertLogs::ChainId) + .big_unsigned() + .not_null(), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // put the RevertLogs back to how it was before our migrations + manager + .alter_table( + sea_query::Table::alter() + .table(RevertLogs::Table) + .drop_column(RevertLogs::ChainId) + .to_owned(), + ) + .await + } +} + +#[derive(Iden)] +enum RevertLogs { + Table, + // Id, + // UserKeyId, + // Method, + // CallData, + // To, + // Timestamp, + ChainId, +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 03771dd2..d1da2de7 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.4.0" +version = "0.5.0" edition = "2021" default-run = "web3_proxy" diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index c1c43292..30322cd8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -6,34 +6,34 @@ use sea_orm::{ActiveModelTrait, TransactionTrait}; use tracing::info; use ulid::Ulid; use uuid::Uuid; -use web3_proxy::frontend::authorization::UserKey; +use web3_proxy::frontend::authorization::RpcApiKey; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Create a new user and api key #[argh(subcommand, name = "create_user")] pub struct CreateUserSubCommand { - #[argh(option)] /// the user's ethereum address or descriptive string. /// If a string is given, it will be converted to hex and potentially truncated. /// Users from strings are only for testing since they won't be able to log in. + #[argh(option)] address: String, - #[argh(option)] /// the user's optional email. + #[argh(option)] email: Option, - #[argh(option, default = "UserKey::new()")] /// the user's first api ULID or UUID key. /// If none given, one will be created. - api_key: UserKey, - #[argh(option)] - /// maximum requests per minute. - /// default to "None" which the code sees as "unlimited" requests. + api_key: RpcApiKey, + + /// the key's maximum requests per minute. + /// Default to "None" which the code sees as "unlimited" requests. + #[argh(option)] rpm: Option, + /// an optional short description of the key's purpose. #[argh(option)] - /// a short description of the key's purpose description: Option, } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 66c40456..fbe85b5f 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -4,12 +4,14 @@ use super::errors::FrontendErrorResponse; use crate::app::{UserKeyData, Web3ProxyApp}; use crate::jsonrpc::JsonRpcRequest; use anyhow::Context; +use axum::headers::authorization::Bearer; use axum::headers::{Origin, Referer, UserAgent}; use axum::TypedHeader; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; -use entities::user_keys; +use entities::{user, user_keys}; use ipnet::IpNet; +use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use serde::Serialize; @@ -25,7 +27,7 @@ use uuid::Uuid; /// This lets us use UUID and ULID while we transition to only ULIDs /// TODO: include the key's description. #[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub enum UserKey { +pub enum RpcApiKey { Ulid(Ulid), Uuid(Uuid), } @@ -104,13 +106,19 @@ impl RequestMetadata { } } -impl UserKey { +impl RpcApiKey { pub fn new() -> Self { Ulid::new().into() } } -impl Display for UserKey { +impl Default for RpcApiKey { + fn default() -> Self { + Self::new() + } +} + +impl Display for RpcApiKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // TODO: do this without dereferencing let ulid: Ulid = (*self).into(); @@ -119,13 +127,7 @@ impl Display for UserKey { } } -impl Default for UserKey { - fn default() -> Self { - Self::new() - } -} - -impl FromStr for UserKey { +impl FromStr for RpcApiKey { type Err = anyhow::Error; fn from_str(s: &str) -> Result { @@ -140,32 +142,32 @@ impl FromStr for UserKey { } } -impl From for UserKey { +impl From for RpcApiKey { fn from(x: Ulid) -> Self { - UserKey::Ulid(x) + RpcApiKey::Ulid(x) } } -impl From for UserKey { +impl From for RpcApiKey { fn from(x: Uuid) -> Self { - UserKey::Uuid(x) + RpcApiKey::Uuid(x) } } -impl From for Ulid { - fn from(x: UserKey) -> Self { +impl From for Ulid { + fn from(x: RpcApiKey) -> Self { match x { - UserKey::Ulid(x) => x, - UserKey::Uuid(x) => Ulid::from(x.as_u128()), + RpcApiKey::Ulid(x) => x, + RpcApiKey::Uuid(x) => Ulid::from(x.as_u128()), } } } -impl From for Uuid { - fn from(x: UserKey) -> Self { +impl From for Uuid { + fn from(x: RpcApiKey) -> Self { match x { - UserKey::Ulid(x) => Uuid::from_u128(x.0), - UserKey::Uuid(x) => x, + RpcApiKey::Ulid(x) => Uuid::from_u128(x.0), + RpcApiKey::Uuid(x) => x, } } } @@ -298,7 +300,7 @@ pub async fn ip_is_authorized( pub async fn key_is_authorized( app: &Web3ProxyApp, - user_key: UserKey, + user_key: RpcApiKey, ip: IpAddr, origin: Option, referer: Option, @@ -373,6 +375,47 @@ impl Web3ProxyApp { } } + /// Verify that the given bearer token and address are allowed to take the specified action. + /// This includes concurrent request limiting. + pub async fn bearer_is_authorized( + &self, + bearer: Bearer, + ) -> anyhow::Result<(user::Model, OwnedSemaphorePermit)> { + // limit concurrent requests + let semaphore = self + .bearer_token_semaphores + .get_with(bearer.token().to_string(), async move { + let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize); + Arc::new(s) + }) + .await; + + let semaphore_permit = semaphore.acquire_owned().await?; + + // get the user id for this bearer token + // TODO: move redis key building to a helper function + let bearer_cache_key = format!("bearer:{}", bearer.token()); + + // get the attached address from redis for the given auth_token. + let mut redis_conn = self.redis_conn().await?; + + let user_id: u64 = redis_conn + .get::<_, Option>(bearer_cache_key) + .await + .context("fetching bearer cache key from redis")? + .context("unknown bearer token")?; + + // turn user id into a user + let db_conn = self.db_conn().context("Getting database connection")?; + let user = user::Entity::find_by_id(user_id) + .one(&db_conn) + .await + .context("fetching user from db by id")? + .context("unknown user id")?; + + Ok((user, semaphore_permit)) + } + pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key // TODO: do we want a semaphore here? @@ -454,7 +497,7 @@ impl Web3ProxyApp { } // check the local cache for user data, or query the database - pub(crate) async fn user_data(&self, user_key: UserKey) -> anyhow::Result { + pub(crate) async fn user_data(&self, user_key: RpcApiKey) -> anyhow::Result { let user_data: Result<_, Arc> = self .user_key_cache .try_get_with(user_key.into(), async move { @@ -539,7 +582,7 @@ impl Web3ProxyApp { user_data.map_err(|err| anyhow::anyhow!(err)) } - pub async fn rate_limit_by_key(&self, user_key: UserKey) -> anyhow::Result { + pub async fn rate_limit_by_key(&self, user_key: RpcApiKey) -> anyhow::Result { let user_data = self.user_data(user_key).await?; if user_data.user_key_id == 0 { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index a68d5589..d0a860f1 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -69,10 +69,12 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() get(users::user_login_get), ) .route("/user/login", post(users::user_login_post)) + .route("/user", get(users::user_get)) + .route("/user", post(users::user_post)) .route("/user/balance", get(users::user_balance_get)) .route("/user/balance/:txid", post(users::user_balance_post)) - .route("/user/profile", get(users::user_profile_get)) - .route("/user/profile", post(users::user_profile_post)) + .route("/user/keys", get(users::user_keys_get)) + .route("/user/keys", post(users::user_keys_post)) .route("/user/revert_logs", get(users::user_revert_logs_get)) .route( "/user/stats/aggregate", diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index c988353b..6276455f 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -1,9 +1,13 @@ //! Handle registration, logins, and managing account data. -use super::authorization::{login_is_authorized, UserKey}; +use super::authorization::{login_is_authorized, RpcApiKey}; use super::errors::FrontendResult; use crate::app::Web3ProxyApp; -use crate::user_queries::{get_aggregate_rpc_stats_from_params, get_detailed_stats}; +use crate::user_queries::{ + get_aggregate_rpc_stats_from_params, get_detailed_stats, get_page_from_params, + get_query_window_seconds_from_params, +}; +use crate::user_queries::{get_chain_id_from_params, get_query_start_from_params}; use anyhow::Context; use axum::{ extract::{Path, Query}, @@ -13,12 +17,15 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::{user, user_keys}; +use entities::{revert_logs, user, user_keys}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; use redis_rate_limiter::redis::AsyncCommands; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, + QuerySelect, TransactionTrait, +}; use serde::Deserialize; use serde_json::json; use siwe::{Message, VerificationOpts}; @@ -42,23 +49,21 @@ use ulid::Ulid; /// This is the initial entrypoint for logging in. Take the response from this endpoint and give it to your user's wallet for singing. POST the response to `/user/login`. /// /// Rate limited by IP address. +/// +/// At first i thought about checking that user_address is in our db, +/// But theres no need to separate the registration and login flows. +/// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist. +/// We can prompt for an email and and payment after they log in. #[debug_handler] pub async fn user_login_get( Extension(app): Extension>, ClientIp(ip): ClientIp, // TODO: what does axum's error handling look like if the path fails to parse? - // TODO: allow ENS names here? Path(mut params): Path>, ) -> FrontendResult { login_is_authorized(&app, ip).await?; - // at first i thought about checking that user_address is in our db - // but theres no need to separate the registration and login flows - // its a better UX to just click "login with ethereum" and have the account created if it doesn't exist - // we can prompt for an email and and payment after they log in - // create a message and save it in redis - // TODO: how many seconds? get from config? let expire_seconds: usize = 20 * 60; @@ -68,6 +73,7 @@ pub async fn user_login_get( let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0)); + // TODO: allow ENS names here? let user_address: Address = params .remove("user_address") // TODO: map_err so this becomes a 500. routing must be bad @@ -249,12 +255,13 @@ pub async fn user_login_post( // create the user's first api key // TODO: rename to UserApiKey? RpcApiKey? - let user_key = UserKey::new(); + let rpc_key = RpcApiKey::new(); // TODO: variable requests per minute depending on the invite code let uk = user_keys::ActiveModel { user_id: sea_orm::Set(u.id), - api_key: sea_orm::Set(user_key.into()), + api_key: sea_orm::Set(rpc_key.into()), + description: sea_orm::Set(Some("first".to_string())), requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute), ..Default::default() }; @@ -286,13 +293,15 @@ pub async fn user_login_post( // create a bearer token for the user. let bearer_token = Ulid::new(); + // json response with everything in it + // we could return just the bearer token, but I think they will always request api keys and the user profile let response_json = json!({ "api_keys": uks .into_iter() .map(|uk| (uk.id, uk)) .collect::>(), "bearer_token": bearer_token, - "user_id": u.id, + "user": u, }); let response = (status_code, Json(response_json)).into_response(); @@ -302,9 +311,7 @@ pub async fn user_login_post( let bearer_redis_key = format!("bearer:{}", bearer_token); // expire in 4 weeks - // TODO: do this with a pipe // TODO: get expiration time from app config - // TODO: do we use this? redis_conn .set_ex(bearer_redis_key, u.id.to_string(), 2_419_200) .await?; @@ -339,56 +346,10 @@ pub async fn user_logout_post( /// the JSON input to the `post_user` handler. #[derive(Deserialize)] pub struct UserProfilePost { - primary_address: Address, - new_primary_address: Option
, // TODO: make sure the email address is valid. probably have a "verified" column in the database email: Option, } -/// `POST /user/profile` -- modify the account connected to the bearer token in the `Authentication` header. -#[debug_handler] -pub async fn user_profile_post( - Extension(app): Extension>, - TypedHeader(Authorization(bearer_token)): TypedHeader>, - Json(payload): Json, -) -> FrontendResult { - let user = ProtectedAction::UserProfilePost(payload.primary_address) - .authorize(app.as_ref(), bearer_token) - .await?; - - let mut user: user::ActiveModel = user.into(); - - // TODO: require a message from the new address to finish the change - if let Some(new_primary_address) = payload.new_primary_address { - if new_primary_address.is_zero() { - // TODO: allow this if some other authentication method is set - return Err(anyhow::anyhow!("cannot clear primary address").into()); - } else { - let new_primary_address = Vec::from(new_primary_address.as_ref()); - - user.address = sea_orm::Set(new_primary_address) - } - } - - if let Some(x) = payload.email { - // TODO: only Set if no change - if x.is_empty() { - user.email = sea_orm::Set(None); - } else { - // TODO: do some basic validation - // TODO: don't set immediatly, send a confirmation email first - user.email = sea_orm::Set(Some(x)); - } - } - - let db_conn = app.db_conn().context("Getting database connection")?; - - user.save(&db_conn).await?; - - // TODO: what should this return? the user? - Ok("success".into_response()) -} - /// `GET /user/balance` -- Use a bearer token to get the user's balance and spend. /// /// - show balance in USD @@ -427,9 +388,7 @@ pub async fn user_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { - let user = ProtectedAction::UserKeys - .authorize(app.as_ref(), bearer_token) - .await?; + let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; let db_conn = app.db_conn().context("getting db to fetch user's keys")?; @@ -441,11 +400,11 @@ pub async fn user_keys_get( // TODO: stricter type on this? let response_json = json!({ - "api_keys": uks + "user_id": user.id, + "user_rpc_keys": uks .into_iter() .map(|uk| (uk.id, uk)) .collect::>(), - "user_id": user.id, }); Ok(Json(response_json).into_response()) @@ -463,17 +422,59 @@ pub async fn user_keys_post( todo!("user_keys_post"); } -/// `GET /user/profile` -- Use a bearer token to get the user's profile. +/// `GET /user` -- Use a bearer token to get the user's profile. /// /// - the email address of a user if they opted in to get contacted via email /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] -pub async fn user_profile_get( +pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { - todo!("user_profile_get"); + let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; + + Ok(Json(user).into_response()) +} + +/// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header. +#[debug_handler] +pub async fn user_post( + Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, + Json(payload): Json, +) -> FrontendResult { + let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; + + let mut user: user::ActiveModel = user.into(); + + // update the email address + if let Some(x) = payload.email { + // TODO: only Set if no change + if x.is_empty() { + user.email = sea_orm::Set(None); + } else { + // TODO: do some basic validation + // TODO: don't set immediatly, send a confirmation email first + // TODO: compare first? or is sea orm smart enough to do that for us? + user.email = sea_orm::Set(Some(x)); + } + } + + // TODO: what else can we update here? password hash? subscription to newsletter? + + let user = if user.is_changed() { + let db_conn = app.db_conn().context("Getting database connection")?; + + user.save(&db_conn).await? + } else { + // no changes. no need to touch the database + user + }; + + let user: user::Model = user.try_into().context("Returning updated user")?; + + Ok(Json(user).into_response()) } /// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs. @@ -481,8 +482,54 @@ pub async fn user_profile_get( pub async fn user_revert_logs_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, + Query(params): Query>, ) -> FrontendResult { - todo!("user_revert_logs_get"); + let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; + + let chain_id = get_chain_id_from_params(app.as_ref(), ¶ms)?; + let query_start = get_query_start_from_params(¶ms)?; + let page = get_page_from_params(¶ms)?; + + // TODO: page size from config + let page_size = 200; + + let mut response = HashMap::new(); + + response.insert("page", json!(page)); + response.insert("page_size", json!(page_size)); + response.insert("chain_id", json!(chain_id)); + response.insert("query_start", json!(query_start.timestamp() as u64)); + + let db_conn = app.db_conn().context("getting db for user's revert logs")?; + + let uks = user_keys::Entity::find() + .filter(user_keys::Column::UserId.eq(user.id)) + .all(&db_conn) + .await + .context("failed loading user's key")?; + + // TODO: only select the ids + let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); + + // get paginated logs + let q = revert_logs::Entity::find() + .filter(revert_logs::Column::Timestamp.gte(query_start)) + .filter(revert_logs::Column::UserKeyId.is_in(uks)) + .order_by_asc(revert_logs::Column::Timestamp); + + let q = if chain_id == 0 { + // don't do anything + q + } else { + // filter on chain id + q.filter(revert_logs::Column::ChainId.eq(chain_id)) + }; + + let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?; + + response.insert("revert_logs", json!(revert_logs)); + + Ok(Json(response).into_response()) } /// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. @@ -516,65 +563,3 @@ pub async fn user_stats_aggregate_get( Ok(Json(x).into_response()) } - -/// `GET /user/profile` -- Use a bearer token to get the user's profile such as their optional email address. -/// Handle authorization for a given address and bearer token. -// TODO: what roles should exist? -enum ProtectedAction { - UserKeys, - UserProfilePost(Address), -} - -impl ProtectedAction { - /// Verify that the given bearer token and address are allowed to take the specified action. - /// This includes concurrent request limiting. - async fn authorize(self, app: &Web3ProxyApp, bearer: Bearer) -> anyhow::Result { - // get the attached address from redis for the given auth_token. - let mut redis_conn = app.redis_conn().await?; - - // limit concurrent requests - let semaphore = app - .bearer_token_semaphores - .get_with(bearer.token().to_string(), async move { - let s = Semaphore::new(app.config.bearer_token_max_concurrent_requests as usize); - Arc::new(s) - }) - .await; - let _semaphore_permit = semaphore.acquire().await?; - - // get the user id for this bearer token - // TODO: move redis key building to a helper function - let bearer_cache_key = format!("bearer:{}", bearer.token()); - - // TODO: move this to a helper function - let user_id: u64 = redis_conn - .get::<_, Option>(bearer_cache_key) - .await - .context("fetching bearer cache key from redis")? - .context("unknown bearer token")?; - - // turn user id into a user - let db_conn = app.db_conn().context("Getting database connection")?; - let user = user::Entity::find_by_id(user_id) - .one(&db_conn) - .await - .context("fetching user from db by id")? - .context("unknown user id")?; - - match self { - Self::UserKeys => { - // no extra checks needed. bearer token gave us a user - } - Self::UserProfilePost(primary_address) => { - let user_address = Address::from_slice(&user.address); - - if user_address != primary_address { - // TODO: check secondary users - return Err(anyhow::anyhow!("user address mismatch")); - } - } - } - - Ok(user) - } -} diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index a4721e1b..f566a1c2 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -56,7 +56,7 @@ async fn get_user_id_from_params( /// only allow user_key to be set if user_id is also set. /// this will keep people from reading someone else's keys. /// 0 means none. -fn get_user_key_id_from_params( +pub fn get_user_key_id_from_params( user_id: u64, params: &HashMap, ) -> anyhow::Result { @@ -74,7 +74,7 @@ fn get_user_key_id_from_params( } } -fn get_chain_id_from_params( +pub fn get_chain_id_from_params( app: &Web3ProxyApp, params: &HashMap, ) -> anyhow::Result { @@ -88,7 +88,7 @@ fn get_chain_id_from_params( ) } -fn get_query_start_from_params( +pub fn get_query_start_from_params( params: &HashMap, ) -> anyhow::Result { params.get("query_start").map_or_else( @@ -111,7 +111,7 @@ fn get_query_start_from_params( ) } -fn get_page_from_params(params: &HashMap) -> anyhow::Result { +pub fn get_page_from_params(params: &HashMap) -> anyhow::Result { params.get("page").map_or_else::, _, _>( || { // no page in params. set default @@ -127,7 +127,9 @@ fn get_page_from_params(params: &HashMap) -> anyhow::Result ) } -fn get_query_window_seconds_from_params(params: &HashMap) -> anyhow::Result { +pub fn get_query_window_seconds_from_params( + params: &HashMap, +) -> anyhow::Result { params.get("query_window_seconds").map_or_else( || { // no page in params. set default @@ -179,12 +181,6 @@ pub async fn get_aggregate_rpc_stats_from_params( serde_json::to_value(query_start.timestamp() as u64)?, ); - if query_window_seconds != 0 { - response.insert( - "query_window_seconds", - serde_json::to_value(query_window_seconds)?, - ); - } // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? // TODO: how do we count uptime? let q = rpc_accounting::Entity::find() @@ -467,143 +463,3 @@ pub async fn get_detailed_stats( Ok(response) } - -/// revert logs for a single key -/// -/// TODO: take a "timebucket" duration in minutes that will make a more advanced -pub async fn get_revert_logs( - app: &Web3ProxyApp, - bearer: Option>>, - params: HashMap, -) -> anyhow::Result> { - let db_conn = app.db_conn().context("connecting to db")?; - let redis_conn = app.redis_conn().await.context("connecting to redis")?; - - let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; - let user_key_id = get_user_key_id_from_params(user_id, ¶ms)?; - let chain_id = get_chain_id_from_params(app, ¶ms)?; - let query_start = get_query_start_from_params(¶ms)?; - let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; - let page = get_page_from_params(¶ms)?; - let page_size = get_page_from_params(¶ms)?; - - let mut response = HashMap::new(); - - response.insert("page", serde_json::to_value(page)?); - response.insert("page_size", serde_json::to_value(page_size)?); - response.insert("chain_id", serde_json::to_value(chain_id)?); - response.insert( - "query_start", - serde_json::to_value(query_start.timestamp() as u64)?, - ); - - if query_window_seconds != 0 { - response.insert( - "query_window_seconds", - serde_json::to_value(query_window_seconds)?, - ); - } - - // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? - // TODO: how do we count uptime? - let q = rpc_accounting::Entity::find() - .select_only() - // groups - .column(rpc_accounting::Column::ErrorResponse) - .group_by(rpc_accounting::Column::ErrorResponse) - .column(rpc_accounting::Column::Method) - .group_by(rpc_accounting::Column::Method) - // aggregate columns - .column_as( - rpc_accounting::Column::FrontendRequests.sum(), - "total_requests", - ) - .column_as( - rpc_accounting::Column::CacheMisses.sum(), - "total_cache_misses", - ) - .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") - .column_as( - rpc_accounting::Column::BackendRetries.sum(), - "total_backend_retries", - ) - .column_as( - rpc_accounting::Column::SumResponseBytes.sum(), - "total_response_bytes", - ) - .column_as( - // TODO: can we sum bools like this? - rpc_accounting::Column::ErrorResponse.sum(), - "total_error_responses", - ) - .column_as( - rpc_accounting::Column::SumResponseMillis.sum(), - "total_response_millis", - ) - // TODO: order on method next? - .order_by_asc(rpc_accounting::Column::PeriodDatetime.min()); - - let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); - - let (condition, q) = if chain_id.is_zero() { - // fetch all the chains. don't filter - // TODO: wait. do we want chain id on the logs? we can get that by joining key - let q = q - .column(rpc_accounting::Column::ChainId) - .group_by(rpc_accounting::Column::ChainId); - - (condition, q) - } else { - let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); - - (condition, q) - }; - - let (condition, q) = if user_id == 0 { - // 0 means everyone. don't filter on user - (condition, q) - } else { - // TODO: move authentication here? - // TODO: what about keys where this user is a secondary user? - let q = q - .join( - JoinType::InnerJoin, - rpc_accounting::Relation::UserKeys.def(), - ) - .column(user_keys::Column::UserId) - // no need to group_by user_id when we are grouping by key_id - // .group_by(user_keys::Column::UserId) - .column(user_keys::Column::Id) - .group_by(user_keys::Column::Id); - - let condition = condition.add(user_keys::Column::UserId.eq(user_id)); - - if user_key_id != 0 { - todo!("wip"); - } - - (condition, q) - }; - - let q = q.filter(condition); - - // TODO: enum between searching on user_key_id on user_id - // TODO: handle secondary users, too - - // log query here. i think sea orm has a useful log level for this - - // TODO: transform this into a nested hashmap instead of a giant table? - let r = q - .into_json() - .paginate(&db_conn, page_size) - .fetch_page(page) - .await?; - - response.insert("detailed_aggregate", serde_json::Value::Array(r)); - - // number of keys - // number of secondary keys - // avg and max concurrent requests per second per api key - - Ok(response) -}