more user endpoints

This commit is contained in:
Bryan Stitt 2022-10-25 21:10:05 +00:00
parent d31484467d
commit 8230172366
14 changed files with 319 additions and 328 deletions

6
Cargo.lock generated
View File

@ -1329,7 +1329,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"sea-orm",
"serde",
@ -2632,7 +2632,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"sea-orm-migration",
"tokio",
@ -5466,7 +5466,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"anyhow",
"arc-swap",

36
bugs.md Normal file
View File

@ -0,0 +1,36 @@
# deadlock
goerli_1 | 1 deadlocks detected
goerli_1 | Deadlock #0
goerli_1 | Thread Id 139881298757376
goerli_1 | 0: 0x5608f7f762d9 - backtrace::backtrace::trace::hbe74611947a262af
goerli_1 | 1: 0x5608f7f7a967 - backtrace::capture::Backtrace::new::h667fe9ee7ec04c33
goerli_1 | 2: 0x5608f7f6ed33 - parking_lot_core::parking_lot::deadlock_impl::on_unpark::h78879313dd6461e5
goerli_1 | 3: 0x5608f730dcd4 - parking_lot::raw_mutex::RawMutex::lock_slow::h9c58bf1ec322b8f6
goerli_1 | 4: 0x5608f78f2e87 - <moka::common::concurrent::housekeeper::ThreadPoolHousekeeper<T> as core::ops::drop::Drop>::drop::h4887dbe8ef7d7472
goerli_1 | 5: 0x5608f7909362 - alloc::sync::Arc<T>::drop_slow::h3de3d854b76812ea
goerli_1 | 6: 0x5608f7919596 - core::ptr::drop_in_place<moka::future::cache::Cache<web3_proxy::app_stats::UserProxyResponseKey,alloc::sync::Arc<web3_proxy::app_stats::ProxyResponseAggregate>,ahash::random_state::RandomState>>::h1bf4d8ebf87406ed
goerli_1 | 7: 0x5608f791ac00 - triomphe::arc::Arc<T>::drop_slow::h246e78aee1f2a265
goerli_1 | 8: 0x5608f78e38bd - crossbeam_epoch::deferred::Deferred::new::call::h395b93588d5e21a9
goerli_1 | 9: 0x5608f72fbaa2 - crossbeam_epoch::internal::Global::collect::h77479fc8b8898340
goerli_1 | 10: 0x5608f73ef22c - <moka::sync_base::base_cache::Inner<K,V,S> as moka::common::concurrent::housekeeper::InnerSync>::sync::h07f3f4f6db1c2598
goerli_1 | 11: 0x5608f75e4ee3 - moka::common::concurrent::housekeeper::ThreadPoolHousekeeper<T>::call_sync::h11b70044870c94f4
goerli_1 | 12: 0x5608f75e4b03 - moka::common::concurrent::housekeeper::ThreadPoolHousekeeper<T>::start_periodical_sync_job::{{closure}}::hdc1c253b1b156548
goerli_1 | 13: 0x5608f7cc8d15 - scheduled_thread_pool::Worker::run_job::hb3ae60b61103071b
goerli_1 | 14: 0x5608f7cc8b8b - scheduled_thread_pool::Worker::run::h760e10fe3281c379
goerli_1 | 15: 0x5608f7ccb294 - std::sys_common::backtrace::__rust_begin_short_backtrace::hc3b55a28c2ef3a5f
goerli_1 | 16: 0x5608f7cc9cb5 - core::ops::function::FnOnce::call_once{{vtable.shim}}::hf330c4157d74cf0e
goerli_1 | 17: 0x5608f7fc8dd3 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once::h56d5fc072706762b
goerli_1 | at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/alloc/src/boxed.rs:1935:9
goerli_1 | <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once::h41deef8e33b824bb
goerli_1 | at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/alloc/src/boxed.rs:1935:9
goerli_1 | std::sys::unix::thread::Thread::new::thread_start::ha6436304a1170bba
goerli_1 | at /rustc/a55dd71d5fb0ec5a6a3a9e8c27b2127ba491ce52/library/std/src/sys/unix/thread.rs:108:17
goerli_1 | 18: 0x7f3b309e9ea7 - start_thread
goerli_1 | 19: 0x7f3b307bfaef - clone
goerli_1 | 20: 0x0 - <unknown>
goerli_1 |
also saw deadlocks on other chains (arbitrum, goerli, gnosis, optimism, polygon, fantom). though luckily not on eth. and it seems like it kept going.
i'm going to guess that the problem is nested caches.
refactor to maybe use a dashmap at one level? or flatten into one level and use channels more

View File

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
[lib]

View File

@ -15,6 +15,7 @@ pub struct Model {
pub to: Vec<u8>,
#[sea_orm(column_type = "Text", nullable)]
pub call_data: Option<String>,
pub chain_id: u64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
publish = false

View File

@ -4,6 +4,7 @@ mod m20220101_000001_create_table;
mod m20220921_181610_log_reverts;
mod m20220928_015108_concurrency_limits;
mod m20221007_213828_accounting;
mod m20221025_210326_add_chain_id_to_reverts;
pub struct Migrator;
@ -15,6 +16,7 @@ impl MigratorTrait for Migrator {
Box::new(m20220921_181610_log_reverts::Migration),
Box::new(m20220928_015108_concurrency_limits::Migration),
Box::new(m20221007_213828_accounting::Migration),
Box::new(m20221025_210326_add_chain_id_to_reverts::Migration),
]
}
}

View File

@ -30,7 +30,6 @@ impl MigrationTrait for Migration {
.alter_table(
sea_query::Table::alter()
.table(UserKeys::Table)
.to_owned()
.drop_column(UserKeys::MaxConcurrentRequests)
.to_owned(),
)

View File

@ -0,0 +1,67 @@
use sea_orm_migration::prelude::table::ColumnDef;
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// add a field to the UserKeys table
manager
.alter_table(
sea_query::Table::alter()
.table(RevertLogs::Table)
// add column for a better version of rate limiting
.add_column(
ColumnDef::new(RevertLogs::ChainId)
.big_unsigned()
.not_null()
// create it with a default of 1
.default(1),
)
.to_owned(),
)
.await?;
manager
.alter_table(
sea_query::Table::alter()
.table(RevertLogs::Table)
// remove the default
.modify_column(
ColumnDef::new(RevertLogs::ChainId)
.big_unsigned()
.not_null(),
)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// put the RevertLogs back to how it was before our migrations
manager
.alter_table(
sea_query::Table::alter()
.table(RevertLogs::Table)
.drop_column(RevertLogs::ChainId)
.to_owned(),
)
.await
}
}
#[derive(Iden)]
enum RevertLogs {
Table,
// Id,
// UserKeyId,
// Method,
// CallData,
// To,
// Timestamp,
ChainId,
}

View File

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
default-run = "web3_proxy"

View File

@ -6,34 +6,34 @@ use sea_orm::{ActiveModelTrait, TransactionTrait};
use tracing::info;
use ulid::Ulid;
use uuid::Uuid;
use web3_proxy::frontend::authorization::UserKey;
use web3_proxy::frontend::authorization::RpcApiKey;
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Create a new user and api key
#[argh(subcommand, name = "create_user")]
pub struct CreateUserSubCommand {
#[argh(option)]
/// the user's ethereum address or descriptive string.
/// If a string is given, it will be converted to hex and potentially truncated.
/// Users from strings are only for testing since they won't be able to log in.
#[argh(option)]
address: String,
#[argh(option)]
/// the user's optional email.
#[argh(option)]
email: Option<String>,
#[argh(option, default = "UserKey::new()")]
/// the user's first api ULID or UUID key.
/// If none given, one will be created.
api_key: UserKey,
#[argh(option)]
/// maximum requests per minute.
/// default to "None" which the code sees as "unlimited" requests.
api_key: RpcApiKey,
/// the key's maximum requests per minute.
/// Default to "None" which the code sees as "unlimited" requests.
#[argh(option)]
rpm: Option<u64>,
/// an optional short description of the key's purpose.
#[argh(option)]
/// a short description of the key's purpose
description: Option<String>,
}

View File

@ -4,12 +4,14 @@ use super::errors::FrontendErrorResponse;
use crate::app::{UserKeyData, Web3ProxyApp};
use crate::jsonrpc::JsonRpcRequest;
use anyhow::Context;
use axum::headers::authorization::Bearer;
use axum::headers::{Origin, Referer, UserAgent};
use axum::TypedHeader;
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::user_keys;
use entities::{user, user_keys};
use ipnet::IpNet;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use serde::Serialize;
@ -25,7 +27,7 @@ use uuid::Uuid;
/// This lets us use UUID and ULID while we transition to only ULIDs
/// TODO: include the key's description.
#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub enum UserKey {
pub enum RpcApiKey {
Ulid(Ulid),
Uuid(Uuid),
}
@ -104,13 +106,19 @@ impl RequestMetadata {
}
}
impl UserKey {
impl RpcApiKey {
pub fn new() -> Self {
Ulid::new().into()
}
}
impl Display for UserKey {
impl Default for RpcApiKey {
fn default() -> Self {
Self::new()
}
}
impl Display for RpcApiKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: do this without dereferencing
let ulid: Ulid = (*self).into();
@ -119,13 +127,7 @@ impl Display for UserKey {
}
}
impl Default for UserKey {
fn default() -> Self {
Self::new()
}
}
impl FromStr for UserKey {
impl FromStr for RpcApiKey {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
@ -140,32 +142,32 @@ impl FromStr for UserKey {
}
}
impl From<Ulid> for UserKey {
impl From<Ulid> for RpcApiKey {
fn from(x: Ulid) -> Self {
UserKey::Ulid(x)
RpcApiKey::Ulid(x)
}
}
impl From<Uuid> for UserKey {
impl From<Uuid> for RpcApiKey {
fn from(x: Uuid) -> Self {
UserKey::Uuid(x)
RpcApiKey::Uuid(x)
}
}
impl From<UserKey> for Ulid {
fn from(x: UserKey) -> Self {
impl From<RpcApiKey> for Ulid {
fn from(x: RpcApiKey) -> Self {
match x {
UserKey::Ulid(x) => x,
UserKey::Uuid(x) => Ulid::from(x.as_u128()),
RpcApiKey::Ulid(x) => x,
RpcApiKey::Uuid(x) => Ulid::from(x.as_u128()),
}
}
}
impl From<UserKey> for Uuid {
fn from(x: UserKey) -> Self {
impl From<RpcApiKey> for Uuid {
fn from(x: RpcApiKey) -> Self {
match x {
UserKey::Ulid(x) => Uuid::from_u128(x.0),
UserKey::Uuid(x) => x,
RpcApiKey::Ulid(x) => Uuid::from_u128(x.0),
RpcApiKey::Uuid(x) => x,
}
}
}
@ -298,7 +300,7 @@ pub async fn ip_is_authorized(
pub async fn key_is_authorized(
app: &Web3ProxyApp,
user_key: UserKey,
user_key: RpcApiKey,
ip: IpAddr,
origin: Option<Origin>,
referer: Option<Referer>,
@ -373,6 +375,47 @@ impl Web3ProxyApp {
}
}
/// Verify that the given bearer token and address are allowed to take the specified action.
/// This includes concurrent request limiting.
pub async fn bearer_is_authorized(
&self,
bearer: Bearer,
) -> anyhow::Result<(user::Model, OwnedSemaphorePermit)> {
// limit concurrent requests
let semaphore = self
.bearer_token_semaphores
.get_with(bearer.token().to_string(), async move {
let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
})
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
// get the user id for this bearer token
// TODO: move redis key building to a helper function
let bearer_cache_key = format!("bearer:{}", bearer.token());
// get the attached address from redis for the given auth_token.
let mut redis_conn = self.redis_conn().await?;
let user_id: u64 = redis_conn
.get::<_, Option<u64>>(bearer_cache_key)
.await
.context("fetching bearer cache key from redis")?
.context("unknown bearer token")?;
// turn user id into a user
let db_conn = self.db_conn().context("Getting database connection")?;
let user = user::Entity::find_by_id(user_id)
.one(&db_conn)
.await
.context("fetching user from db by id")?
.context("unknown user id")?;
Ok((user, semaphore_permit))
}
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: do we want a semaphore here?
@ -454,7 +497,7 @@ impl Web3ProxyApp {
}
// check the local cache for user data, or query the database
pub(crate) async fn user_data(&self, user_key: UserKey) -> anyhow::Result<UserKeyData> {
pub(crate) async fn user_data(&self, user_key: RpcApiKey) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self
.user_key_cache
.try_get_with(user_key.into(), async move {
@ -539,7 +582,7 @@ impl Web3ProxyApp {
user_data.map_err(|err| anyhow::anyhow!(err))
}
pub async fn rate_limit_by_key(&self, user_key: UserKey) -> anyhow::Result<RateLimitResult> {
pub async fn rate_limit_by_key(&self, user_key: RpcApiKey) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(user_key).await?;
if user_data.user_key_id == 0 {

View File

@ -69,10 +69,12 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
get(users::user_login_get),
)
.route("/user/login", post(users::user_login_post))
.route("/user", get(users::user_get))
.route("/user", post(users::user_post))
.route("/user/balance", get(users::user_balance_get))
.route("/user/balance/:txid", post(users::user_balance_post))
.route("/user/profile", get(users::user_profile_get))
.route("/user/profile", post(users::user_profile_post))
.route("/user/keys", get(users::user_keys_get))
.route("/user/keys", post(users::user_keys_post))
.route("/user/revert_logs", get(users::user_revert_logs_get))
.route(
"/user/stats/aggregate",

View File

@ -1,9 +1,13 @@
//! Handle registration, logins, and managing account data.
use super::authorization::{login_is_authorized, UserKey};
use super::authorization::{login_is_authorized, RpcApiKey};
use super::errors::FrontendResult;
use crate::app::Web3ProxyApp;
use crate::user_queries::{get_aggregate_rpc_stats_from_params, get_detailed_stats};
use crate::user_queries::{
get_aggregate_rpc_stats_from_params, get_detailed_stats, get_page_from_params,
get_query_window_seconds_from_params,
};
use crate::user_queries::{get_chain_id_from_params, get_query_start_from_params};
use anyhow::Context;
use axum::{
extract::{Path, Query},
@ -13,12 +17,15 @@ use axum::{
};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use entities::{user, user_keys};
use entities::{revert_logs, user, user_keys};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait};
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, TransactionTrait,
};
use serde::Deserialize;
use serde_json::json;
use siwe::{Message, VerificationOpts};
@ -42,23 +49,21 @@ use ulid::Ulid;
/// This is the initial entrypoint for logging in. Take the response from this endpoint and give it to your user's wallet for singing. POST the response to `/user/login`.
///
/// Rate limited by IP address.
///
/// At first i thought about checking that user_address is in our db,
/// But theres no need to separate the registration and login flows.
/// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist.
/// We can prompt for an email and and payment after they log in.
#[debug_handler]
pub async fn user_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
// TODO: what does axum's error handling look like if the path fails to parse?
// TODO: allow ENS names here?
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
login_is_authorized(&app, ip).await?;
// at first i thought about checking that user_address is in our db
// but theres no need to separate the registration and login flows
// its a better UX to just click "login with ethereum" and have the account created if it doesn't exist
// we can prompt for an email and and payment after they log in
// create a message and save it in redis
// TODO: how many seconds? get from config?
let expire_seconds: usize = 20 * 60;
@ -68,6 +73,7 @@ pub async fn user_login_get(
let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0));
// TODO: allow ENS names here?
let user_address: Address = params
.remove("user_address")
// TODO: map_err so this becomes a 500. routing must be bad
@ -249,12 +255,13 @@ pub async fn user_login_post(
// create the user's first api key
// TODO: rename to UserApiKey? RpcApiKey?
let user_key = UserKey::new();
let rpc_key = RpcApiKey::new();
// TODO: variable requests per minute depending on the invite code
let uk = user_keys::ActiveModel {
user_id: sea_orm::Set(u.id),
api_key: sea_orm::Set(user_key.into()),
api_key: sea_orm::Set(rpc_key.into()),
description: sea_orm::Set(Some("first".to_string())),
requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute),
..Default::default()
};
@ -286,13 +293,15 @@ pub async fn user_login_post(
// create a bearer token for the user.
let bearer_token = Ulid::new();
// json response with everything in it
// we could return just the bearer token, but I think they will always request api keys and the user profile
let response_json = json!({
"api_keys": uks
.into_iter()
.map(|uk| (uk.id, uk))
.collect::<HashMap<_, _>>(),
"bearer_token": bearer_token,
"user_id": u.id,
"user": u,
});
let response = (status_code, Json(response_json)).into_response();
@ -302,9 +311,7 @@ pub async fn user_login_post(
let bearer_redis_key = format!("bearer:{}", bearer_token);
// expire in 4 weeks
// TODO: do this with a pipe
// TODO: get expiration time from app config
// TODO: do we use this?
redis_conn
.set_ex(bearer_redis_key, u.id.to_string(), 2_419_200)
.await?;
@ -339,56 +346,10 @@ pub async fn user_logout_post(
/// the JSON input to the `post_user` handler.
#[derive(Deserialize)]
pub struct UserProfilePost {
primary_address: Address,
new_primary_address: Option<Address>,
// TODO: make sure the email address is valid. probably have a "verified" column in the database
email: Option<String>,
}
/// `POST /user/profile` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_profile_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserProfilePost>,
) -> FrontendResult {
let user = ProtectedAction::UserProfilePost(payload.primary_address)
.authorize(app.as_ref(), bearer_token)
.await?;
let mut user: user::ActiveModel = user.into();
// TODO: require a message from the new address to finish the change
if let Some(new_primary_address) = payload.new_primary_address {
if new_primary_address.is_zero() {
// TODO: allow this if some other authentication method is set
return Err(anyhow::anyhow!("cannot clear primary address").into());
} else {
let new_primary_address = Vec::from(new_primary_address.as_ref());
user.address = sea_orm::Set(new_primary_address)
}
}
if let Some(x) = payload.email {
// TODO: only Set if no change
if x.is_empty() {
user.email = sea_orm::Set(None);
} else {
// TODO: do some basic validation
// TODO: don't set immediatly, send a confirmation email first
user.email = sea_orm::Set(Some(x));
}
}
let db_conn = app.db_conn().context("Getting database connection")?;
user.save(&db_conn).await?;
// TODO: what should this return? the user?
Ok("success".into_response())
}
/// `GET /user/balance` -- Use a bearer token to get the user's balance and spend.
///
/// - show balance in USD
@ -427,9 +388,7 @@ pub async fn user_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let user = ProtectedAction::UserKeys
.authorize(app.as_ref(), bearer_token)
.await?;
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
let db_conn = app.db_conn().context("getting db to fetch user's keys")?;
@ -441,11 +400,11 @@ pub async fn user_keys_get(
// TODO: stricter type on this?
let response_json = json!({
"api_keys": uks
"user_id": user.id,
"user_rpc_keys": uks
.into_iter()
.map(|uk| (uk.id, uk))
.collect::<HashMap::<_, _>>(),
"user_id": user.id,
});
Ok(Json(response_json).into_response())
@ -463,17 +422,59 @@ pub async fn user_keys_post(
todo!("user_keys_post");
}
/// `GET /user/profile` -- Use a bearer token to get the user's profile.
/// `GET /user` -- Use a bearer token to get the user's profile.
///
/// - the email address of a user if they opted in to get contacted via email
///
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_profile_get(
pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
todo!("user_profile_get");
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
Ok(Json(user).into_response())
}
/// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserProfilePost>,
) -> FrontendResult {
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
let mut user: user::ActiveModel = user.into();
// update the email address
if let Some(x) = payload.email {
// TODO: only Set if no change
if x.is_empty() {
user.email = sea_orm::Set(None);
} else {
// TODO: do some basic validation
// TODO: don't set immediatly, send a confirmation email first
// TODO: compare first? or is sea orm smart enough to do that for us?
user.email = sea_orm::Set(Some(x));
}
}
// TODO: what else can we update here? password hash? subscription to newsletter?
let user = if user.is_changed() {
let db_conn = app.db_conn().context("Getting database connection")?;
user.save(&db_conn).await?
} else {
// no changes. no need to touch the database
user
};
let user: user::Model = user.try_into().context("Returning updated user")?;
Ok(Json(user).into_response())
}
/// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs.
@ -481,8 +482,54 @@ pub async fn user_profile_get(
pub async fn user_revert_logs_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Query(params): Query<HashMap<String, String>>,
) -> FrontendResult {
todo!("user_revert_logs_get");
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
let chain_id = get_chain_id_from_params(app.as_ref(), &params)?;
let query_start = get_query_start_from_params(&params)?;
let page = get_page_from_params(&params)?;
// TODO: page size from config
let page_size = 200;
let mut response = HashMap::new();
response.insert("page", json!(page));
response.insert("page_size", json!(page_size));
response.insert("chain_id", json!(chain_id));
response.insert("query_start", json!(query_start.timestamp() as u64));
let db_conn = app.db_conn().context("getting db for user's revert logs")?;
let uks = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
.all(&db_conn)
.await
.context("failed loading user's key")?;
// TODO: only select the ids
let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect();
// get paginated logs
let q = revert_logs::Entity::find()
.filter(revert_logs::Column::Timestamp.gte(query_start))
.filter(revert_logs::Column::UserKeyId.is_in(uks))
.order_by_asc(revert_logs::Column::Timestamp);
let q = if chain_id == 0 {
// don't do anything
q
} else {
// filter on chain id
q.filter(revert_logs::Column::ChainId.eq(chain_id))
};
let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?;
response.insert("revert_logs", json!(revert_logs));
Ok(Json(response).into_response())
}
/// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested.
@ -516,65 +563,3 @@ pub async fn user_stats_aggregate_get(
Ok(Json(x).into_response())
}
/// `GET /user/profile` -- Use a bearer token to get the user's profile such as their optional email address.
/// Handle authorization for a given address and bearer token.
// TODO: what roles should exist?
enum ProtectedAction {
UserKeys,
UserProfilePost(Address),
}
impl ProtectedAction {
/// Verify that the given bearer token and address are allowed to take the specified action.
/// This includes concurrent request limiting.
async fn authorize(self, app: &Web3ProxyApp, bearer: Bearer) -> anyhow::Result<user::Model> {
// get the attached address from redis for the given auth_token.
let mut redis_conn = app.redis_conn().await?;
// limit concurrent requests
let semaphore = app
.bearer_token_semaphores
.get_with(bearer.token().to_string(), async move {
let s = Semaphore::new(app.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
})
.await;
let _semaphore_permit = semaphore.acquire().await?;
// get the user id for this bearer token
// TODO: move redis key building to a helper function
let bearer_cache_key = format!("bearer:{}", bearer.token());
// TODO: move this to a helper function
let user_id: u64 = redis_conn
.get::<_, Option<u64>>(bearer_cache_key)
.await
.context("fetching bearer cache key from redis")?
.context("unknown bearer token")?;
// turn user id into a user
let db_conn = app.db_conn().context("Getting database connection")?;
let user = user::Entity::find_by_id(user_id)
.one(&db_conn)
.await
.context("fetching user from db by id")?
.context("unknown user id")?;
match self {
Self::UserKeys => {
// no extra checks needed. bearer token gave us a user
}
Self::UserProfilePost(primary_address) => {
let user_address = Address::from_slice(&user.address);
if user_address != primary_address {
// TODO: check secondary users
return Err(anyhow::anyhow!("user address mismatch"));
}
}
}
Ok(user)
}
}

View File

@ -56,7 +56,7 @@ async fn get_user_id_from_params(
/// only allow user_key to be set if user_id is also set.
/// this will keep people from reading someone else's keys.
/// 0 means none.
fn get_user_key_id_from_params(
pub fn get_user_key_id_from_params(
user_id: u64,
params: &HashMap<String, String>,
) -> anyhow::Result<u64> {
@ -74,7 +74,7 @@ fn get_user_key_id_from_params(
}
}
fn get_chain_id_from_params(
pub fn get_chain_id_from_params(
app: &Web3ProxyApp,
params: &HashMap<String, String>,
) -> anyhow::Result<u64> {
@ -88,7 +88,7 @@ fn get_chain_id_from_params(
)
}
fn get_query_start_from_params(
pub fn get_query_start_from_params(
params: &HashMap<String, String>,
) -> anyhow::Result<chrono::NaiveDateTime> {
params.get("query_start").map_or_else(
@ -111,7 +111,7 @@ fn get_query_start_from_params(
)
}
fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> {
pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> {
params.get("page").map_or_else::<anyhow::Result<u64>, _, _>(
|| {
// no page in params. set default
@ -127,7 +127,9 @@ fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64>
)
}
fn get_query_window_seconds_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> {
pub fn get_query_window_seconds_from_params(
params: &HashMap<String, String>,
) -> anyhow::Result<u64> {
params.get("query_window_seconds").map_or_else(
|| {
// no page in params. set default
@ -179,12 +181,6 @@ pub async fn get_aggregate_rpc_stats_from_params(
serde_json::to_value(query_start.timestamp() as u64)?,
);
if query_window_seconds != 0 {
response.insert(
"query_window_seconds",
serde_json::to_value(query_window_seconds)?,
);
}
// TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users?
// TODO: how do we count uptime?
let q = rpc_accounting::Entity::find()
@ -467,143 +463,3 @@ pub async fn get_detailed_stats(
Ok(response)
}
/// revert logs for a single key
///
/// TODO: take a "timebucket" duration in minutes that will make a more advanced
pub async fn get_revert_logs(
app: &Web3ProxyApp,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: HashMap<String, String>,
) -> anyhow::Result<HashMap<&str, serde_json::Value>> {
let db_conn = app.db_conn().context("connecting to db")?;
let redis_conn = app.redis_conn().await.context("connecting to redis")?;
let user_id = get_user_id_from_params(redis_conn, bearer, &params).await?;
let user_key_id = get_user_key_id_from_params(user_id, &params)?;
let chain_id = get_chain_id_from_params(app, &params)?;
let query_start = get_query_start_from_params(&params)?;
let query_window_seconds = get_query_window_seconds_from_params(&params)?;
let page = get_page_from_params(&params)?;
let page_size = get_page_from_params(&params)?;
let mut response = HashMap::new();
response.insert("page", serde_json::to_value(page)?);
response.insert("page_size", serde_json::to_value(page_size)?);
response.insert("chain_id", serde_json::to_value(chain_id)?);
response.insert(
"query_start",
serde_json::to_value(query_start.timestamp() as u64)?,
);
if query_window_seconds != 0 {
response.insert(
"query_window_seconds",
serde_json::to_value(query_window_seconds)?,
);
}
// TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users?
// TODO: how do we count uptime?
let q = rpc_accounting::Entity::find()
.select_only()
// groups
.column(rpc_accounting::Column::ErrorResponse)
.group_by(rpc_accounting::Column::ErrorResponse)
.column(rpc_accounting::Column::Method)
.group_by(rpc_accounting::Column::Method)
// aggregate columns
.column_as(
rpc_accounting::Column::FrontendRequests.sum(),
"total_requests",
)
.column_as(
rpc_accounting::Column::CacheMisses.sum(),
"total_cache_misses",
)
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
.column_as(
rpc_accounting::Column::BackendRetries.sum(),
"total_backend_retries",
)
.column_as(
rpc_accounting::Column::SumResponseBytes.sum(),
"total_response_bytes",
)
.column_as(
// TODO: can we sum bools like this?
rpc_accounting::Column::ErrorResponse.sum(),
"total_error_responses",
)
.column_as(
rpc_accounting::Column::SumResponseMillis.sum(),
"total_response_millis",
)
// TODO: order on method next?
.order_by_asc(rpc_accounting::Column::PeriodDatetime.min());
let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start));
let (condition, q) = if chain_id.is_zero() {
// fetch all the chains. don't filter
// TODO: wait. do we want chain id on the logs? we can get that by joining key
let q = q
.column(rpc_accounting::Column::ChainId)
.group_by(rpc_accounting::Column::ChainId);
(condition, q)
} else {
let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id));
(condition, q)
};
let (condition, q) = if user_id == 0 {
// 0 means everyone. don't filter on user
(condition, q)
} else {
// TODO: move authentication here?
// TODO: what about keys where this user is a secondary user?
let q = q
.join(
JoinType::InnerJoin,
rpc_accounting::Relation::UserKeys.def(),
)
.column(user_keys::Column::UserId)
// no need to group_by user_id when we are grouping by key_id
// .group_by(user_keys::Column::UserId)
.column(user_keys::Column::Id)
.group_by(user_keys::Column::Id);
let condition = condition.add(user_keys::Column::UserId.eq(user_id));
if user_key_id != 0 {
todo!("wip");
}
(condition, q)
};
let q = q.filter(condition);
// TODO: enum between searching on user_key_id on user_id
// TODO: handle secondary users, too
// log query here. i think sea orm has a useful log level for this
// TODO: transform this into a nested hashmap instead of a giant table?
let r = q
.into_json()
.paginate(&db_conn, page_size)
.fetch_page(page)
.await?;
response.insert("detailed_aggregate", serde_json::Value::Array(r));
// number of keys
// number of secondary keys
// avg and max concurrent requests per second per api key
Ok(response)
}