clean up migration

This commit is contained in:
Bryan Stitt 2022-11-01 18:54:39 +00:00
parent 1d22291737
commit 8b35bf5e63
26 changed files with 308 additions and 160 deletions

4
Cargo.lock generated
View File

@ -1348,7 +1348,7 @@ dependencies = [
[[package]] [[package]]
name = "entities" name = "entities"
version = "0.6.0" version = "0.8.0"
dependencies = [ dependencies = [
"sea-orm", "sea-orm",
"serde", "serde",
@ -2657,7 +2657,7 @@ dependencies = [
[[package]] [[package]]
name = "migration" name = "migration"
version = "0.7.0" version = "0.8.0"
dependencies = [ dependencies = [
"sea-orm-migration", "sea-orm-migration",
"tokio", "tokio",

View File

@ -64,6 +64,13 @@ $ websocat ws://127.0.0.1:8544
You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains. You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains.
Run migrations (useful during development. in production, the migrations run on application start)
```
cd migration
cargo run up
```
## Database entities ## Database entities
This command only needs to be run during development. Production should use the already generated entities. This command only needs to be run during development. Production should use the already generated entities.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "entities" name = "entities"
version = "0.6.0" version = "0.8.0"
edition = "2021" edition = "2021"
[lib] [lib]

View File

@ -1,10 +1,11 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
pub mod prelude; pub mod prelude;
pub mod revert_logs; pub mod revert_log;
pub mod rpc_accounting; pub mod rpc_accounting;
pub mod rpc_key;
pub mod sea_orm_active_enums; pub mod sea_orm_active_enums;
pub mod secondary_user; pub mod secondary_user;
pub mod user; pub mod user;
pub mod rpc_keys; pub mod user_tier;

View File

@ -1,7 +1,8 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
pub use super::revert_logs::Entity as RevertLogs; pub use super::revert_log::Entity as RevertLog;
pub use super::rpc_accounting::Entity as RpcAccounting; pub use super::rpc_accounting::Entity as RpcAccounting;
pub use super::rpc_keys::Entity as UserKeys; pub use super::rpc_key::Entity as RpcKey;
pub use super::secondary_user::Entity as SecondaryUser; pub use super::secondary_user::Entity as SecondaryUser;
pub use super::user::Entity as User; pub use super::user::Entity as User;
pub use super::user_tier::Entity as UserTier;

View File

@ -1,11 +1,11 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use super::sea_orm_active_enums::Method; use super::sea_orm_active_enums::Method;
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "revert_logs")] #[sea_orm(table_name = "revert_log")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: u64, pub id: u64,
@ -21,18 +21,18 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation { pub enum Relation {
#[sea_orm( #[sea_orm(
belongs_to = "super::rpc_keys::Entity", belongs_to = "super::rpc_key::Entity",
from = "Column::RpcKeyId", from = "Column::RpcKeyId",
to = "super::rpc_keys::Column::Id", to = "super::rpc_key::Column::Id",
on_update = "NoAction", on_update = "NoAction",
on_delete = "NoAction" on_delete = "NoAction"
)] )]
UserKeys, RpcKey,
} }
impl Related<super::rpc_keys::Entity> for Entity { impl Related<super::rpc_key::Entity> for Entity {
fn to() -> RelationDef { fn to() -> RelationDef {
Relation::UserKeys.def() Relation::RpcKey.def()
} }
} }

View File

@ -1,4 +1,4 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -13,14 +13,12 @@ pub struct Model {
pub method: String, pub method: String,
pub error_response: bool, pub error_response: bool,
pub period_datetime: DateTimeUtc, pub period_datetime: DateTimeUtc,
// TODO: migration to make these u32
pub frontend_requests: u64, pub frontend_requests: u64,
pub backend_requests: u64, pub backend_requests: u64,
pub backend_retries: u64, pub backend_retries: u64,
pub no_servers: u64, pub no_servers: u64,
pub cache_misses: u64, pub cache_misses: u64,
pub cache_hits: u64, pub cache_hits: u64,
// TODO: end migration to make these u32
pub sum_request_bytes: u64, pub sum_request_bytes: u64,
pub min_request_bytes: u64, pub min_request_bytes: u64,
pub mean_request_bytes: f64, pub mean_request_bytes: f64,
@ -47,18 +45,18 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation { pub enum Relation {
#[sea_orm( #[sea_orm(
belongs_to = "super::rpc_keys::Entity", belongs_to = "super::rpc_key::Entity",
from = "Column::RpcKeyId", from = "Column::RpcKeyId",
to = "super::rpc_keys::Column::Id", to = "super::rpc_key::Column::Id",
on_update = "NoAction", on_update = "NoAction",
on_delete = "NoAction" on_delete = "NoAction"
)] )]
RpcKeys, RpcKey,
} }
impl Related<super::rpc_keys::Entity> for Entity { impl Related<super::rpc_key::Entity> for Entity {
fn to() -> RelationDef { fn to() -> RelationDef {
Relation::RpcKeys.def() Relation::RpcKey.def()
} }
} }

View File

@ -1,22 +1,19 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "rpc_keys")] #[sea_orm(table_name = "rpc_key")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: u64, pub id: u64,
pub user_id: u64, pub user_id: u64,
#[sea_orm(unique)] #[sea_orm(unique)]
pub rpc_key: Uuid, pub secret_key: Uuid,
pub description: Option<String>, pub description: Option<String>,
pub private_txs: bool, pub private_txs: bool,
pub active: bool, pub active: bool,
pub requests_per_minute: Option<u64>,
#[sea_orm(column_type = "Decimal(Some((5, 4)))")]
pub log_revert_chance: Decimal,
#[sea_orm(column_type = "Text", nullable)] #[sea_orm(column_type = "Text", nullable)]
pub allowed_ips: Option<String>, pub allowed_ips: Option<String>,
#[sea_orm(column_type = "Text", nullable)] #[sea_orm(column_type = "Text", nullable)]
@ -25,7 +22,7 @@ pub struct Model {
pub allowed_referers: Option<String>, pub allowed_referers: Option<String>,
#[sea_orm(column_type = "Text", nullable)] #[sea_orm(column_type = "Text", nullable)]
pub allowed_user_agents: Option<String>, pub allowed_user_agents: Option<String>,
pub max_concurrent_requests: Option<u64>, pub log_revert_chance: f64,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -38,8 +35,8 @@ pub enum Relation {
on_delete = "NoAction" on_delete = "NoAction"
)] )]
User, User,
#[sea_orm(has_many = "super::revert_logs::Entity")] #[sea_orm(has_many = "super::revert_log::Entity")]
RevertLogs, RevertLog,
#[sea_orm(has_many = "super::rpc_accounting::Entity")] #[sea_orm(has_many = "super::rpc_accounting::Entity")]
RpcAccounting, RpcAccounting,
} }
@ -50,9 +47,9 @@ impl Related<super::user::Entity> for Entity {
} }
} }
impl Related<super::revert_logs::Entity> for Entity { impl Related<super::revert_log::Entity> for Entity {
fn to() -> RelationDef { fn to() -> RelationDef {
Relation::RevertLogs.def() Relation::RevertLog.def()
} }
} }

View File

@ -1,7 +1,6 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use sea_orm::EnumIter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]

View File

@ -1,4 +1,4 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use super::sea_orm_active_enums::Role; use super::sea_orm_active_enums::Role;
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
@ -10,9 +10,7 @@ pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: u64, pub id: u64,
pub user_id: u64, pub user_id: u64,
pub address: Vec<u8>,
pub description: Option<String>, pub description: Option<String>,
pub email: Option<String>,
pub role: Role, pub role: Role,
} }

View File

@ -1,4 +1,4 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 //! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -12,14 +12,35 @@ pub struct Model {
pub address: Vec<u8>, pub address: Vec<u8>,
pub description: Option<String>, pub description: Option<String>,
pub email: Option<String>, pub email: Option<String>,
pub user_tier_id: u64,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation { pub enum Relation {
#[sea_orm(
belongs_to = "super::user_tier::Entity",
from = "Column::UserTierId",
to = "super::user_tier::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
UserTier,
#[sea_orm(has_many = "super::rpc_key::Entity")]
RpcKey,
#[sea_orm(has_many = "super::secondary_user::Entity")] #[sea_orm(has_many = "super::secondary_user::Entity")]
SecondaryUser, SecondaryUser,
#[sea_orm(has_many = "super::rpc_keys::Entity")] }
RpcKeys,
impl Related<super::user_tier::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserTier.def()
}
}
impl Related<super::rpc_key::Entity> for Entity {
fn to() -> RelationDef {
Relation::RpcKey.def()
}
} }
impl Related<super::secondary_user::Entity> for Entity { impl Related<super::secondary_user::Entity> for Entity {
@ -28,10 +49,4 @@ impl Related<super::secondary_user::Entity> for Entity {
} }
} }
impl Related<super::rpc_keys::Entity> for Entity {
fn to() -> RelationDef {
Relation::RpcKeys.def()
}
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}

28
entities/src/user_tier.rs Normal file
View File

@ -0,0 +1,28 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "user_tier")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub title: String,
pub max_requests_per_minute: Option<u64>,
pub max_concurrent_requests: Option<u32>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::user::Entity")]
User,
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,6 +1,6 @@
[package] [package]
name = "migration" name = "migration"
version = "0.7.0" version = "0.8.0"
edition = "2021" edition = "2021"
publish = false publish = false

View File

@ -7,6 +7,7 @@ mod m20221007_213828_accounting;
mod m20221025_210326_add_chain_id_to_reverts; mod m20221025_210326_add_chain_id_to_reverts;
mod m20221026_230819_rename_user_keys; mod m20221026_230819_rename_user_keys;
mod m20221027_002407_user_tiers; mod m20221027_002407_user_tiers;
mod m20221031_211916_clean_up;
pub struct Migrator; pub struct Migrator;
@ -21,6 +22,7 @@ impl MigratorTrait for Migrator {
Box::new(m20221025_210326_add_chain_id_to_reverts::Migration), Box::new(m20221025_210326_add_chain_id_to_reverts::Migration),
Box::new(m20221026_230819_rename_user_keys::Migration), Box::new(m20221026_230819_rename_user_keys::Migration),
Box::new(m20221027_002407_user_tiers::Migration), Box::new(m20221027_002407_user_tiers::Migration),
Box::new(m20221031_211916_clean_up::Migration),
] ]
} }
} }

View File

@ -0,0 +1,103 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// rename tables from plural to singluar
manager
.rename_table(
Table::rename()
.table(Alias::new("revert_logs"), Alias::new("revert_log"))
.to_owned(),
)
.await?;
manager
.rename_table(
Table::rename()
.table(Alias::new("rpc_keys"), Alias::new("rpc_key"))
.to_owned(),
)
.await?;
// on rpc_key table, rename rpc_key to secret_key
manager
.alter_table(
Table::alter()
.table(Alias::new("rpc_key"))
.rename_column(Alias::new("rpc_key"), Alias::new("secret_key"))
.to_owned(),
)
.await?;
// on revert_log table, rename user_key_id to rpc_key_id
manager
.alter_table(
Table::alter()
.table(Alias::new("revert_log"))
.rename_column(Alias::new("user_key_id"), Alias::new("rpc_key_id"))
.to_owned(),
)
.await?;
// on rpc_accounting table, rename user_key_id to rpc_key_id
manager
.alter_table(
Table::alter()
.table(Alias::new("rpc_accounting"))
.rename_column(Alias::new("user_key_id"), Alias::new("rpc_key_id"))
.to_owned(),
)
.await?;
// on secondary_users table, remove "email" and "address" column
manager
.alter_table(
Table::alter()
.table(Alias::new("secondary_user"))
.drop_column(Alias::new("email"))
.drop_column(Alias::new("address"))
.to_owned(),
)
.await?;
// on user_tier table, rename requests_per_minute to max_requests_per_minute
manager
.alter_table(
Table::alter()
.table(Alias::new("user_tier"))
.rename_column(
Alias::new("requests_per_minute"),
Alias::new("max_requests_per_minute"),
)
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Alias::new("rpc_key"))
.drop_column(Alias::new("log_revert_chance"))
.add_column(
ColumnDef::new(Alias::new("log_revert_chance"))
.double()
.not_null()
.default(0.0),
)
.to_owned(),
)
.await?;
// rename column rpc_key to rpc_secret_key
Ok(())
}
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
todo!();
}
}

View File

@ -29,7 +29,6 @@ use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
use migration::{Migrator, MigratorTrait}; use migration::{Migrator, MigratorTrait};
use moka::future::Cache; use moka::future::Cache;
use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use sea_orm::prelude::Decimal;
use sea_orm::DatabaseConnection; use sea_orm::DatabaseConnection;
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
@ -68,9 +67,9 @@ pub struct UserKeyData {
pub user_id: u64, pub user_id: u64,
/// database id of the rpc key /// database id of the rpc key
pub rpc_key_id: u64, pub rpc_key_id: u64,
/// if None, allow unlimited queries /// if None, allow unlimited queries. inherited from the user_tier
pub max_requests_per_period: Option<u64>, pub max_requests_per_period: Option<u64>,
// if None, allow unlimited concurrent requests // if None, allow unlimited concurrent requests. inherited from the user_tier
pub max_concurrent_requests: Option<u64>, pub max_concurrent_requests: Option<u64>,
/// if None, allow any Origin /// if None, allow any Origin
pub allowed_origins: Option<Vec<Origin>>, pub allowed_origins: Option<Vec<Origin>>,
@ -81,7 +80,8 @@ pub struct UserKeyData {
/// if None, allow any IP Address /// if None, allow any IP Address
pub allowed_ips: Option<Vec<IpNet>>, pub allowed_ips: Option<Vec<IpNet>>,
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database. /// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
pub log_revert_chance: Decimal, /// TODO: f32 would be fine
pub log_revert_chance: f64,
} }
/// The application /// The application
@ -108,7 +108,8 @@ pub struct Web3ProxyApp {
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>, pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub login_rate_limiter: Option<RedisRateLimiter>, pub login_rate_limiter: Option<RedisRateLimiter>,
pub vredis_pool: Option<RedisPool>, pub vredis_pool: Option<RedisPool>,
pub rpc_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>, // TODO: this key should be our RpcSecretKey class, not Ulid
pub rpc_secret_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>, pub rpc_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores: pub bearer_token_semaphores:
@ -456,9 +457,9 @@ impl Web3ProxyApp {
// if there is no database of users, there will be no keys and so this will be empty // if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config // TODO: max_capacity from config
// TODO: ttl from config // TODO: ttl from config
let rpc_key_cache = Cache::builder() let rpc_secret_key_cache = Cache::builder()
.max_capacity(10_000) .max_capacity(10_000)
.time_to_live(Duration::from_secs(60)) .time_to_live(Duration::from_secs(600))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// create semaphores for concurrent connection limits // create semaphores for concurrent connection limits
@ -488,7 +489,7 @@ impl Web3ProxyApp {
vredis_pool, vredis_pool,
app_metrics, app_metrics,
open_request_handle_metrics, open_request_handle_metrics,
rpc_key_cache, rpc_secret_key_cache,
bearer_token_semaphores, bearer_token_semaphores,
ip_semaphores, ip_semaphores,
rpc_key_semaphores, rpc_key_semaphores,

View File

@ -267,7 +267,7 @@ mod tests {
let app_config = TopConfig { let app_config = TopConfig {
app: AppConfig { app: AppConfig {
chain_id: 31337, chain_id: 31337,
default_user_requests_per_minute: Some(6_000_000), default_user_max_requests_per_minute: Some(6_000_000),
min_sum_soft_limit: 1, min_sum_soft_limit: 1,
min_synced_rpcs: 1, min_synced_rpcs: 1,
public_requests_per_minute: Some(1_000_000), public_requests_per_minute: Some(1_000_000),

View File

@ -35,7 +35,7 @@ impl CheckConfigSubCommand {
Some(_) => {} Some(_) => {}
} }
match top_config.app.default_user_requests_per_minute { match top_config.app.default_user_max_requests_per_minute {
None => { None => {
info!("app.default_user_requests_per_minute is None. Fully open to registered requests!") info!("app.default_user_requests_per_minute is None. Fully open to registered requests!")
} }

View File

@ -1,12 +1,12 @@
use anyhow::Context; use anyhow::Context;
use argh::FromArgs; use argh::FromArgs;
use entities::{rpc_keys, user}; use entities::{rpc_key, user};
use ethers::prelude::Address; use ethers::prelude::Address;
use sea_orm::{ActiveModelTrait, TransactionTrait}; use sea_orm::{ActiveModelTrait, TransactionTrait};
use tracing::info; use tracing::info;
use ulid::Ulid; use ulid::Ulid;
use uuid::Uuid; use uuid::Uuid;
use web3_proxy::frontend::authorization::RpcApiKey; use web3_proxy::frontend::authorization::RpcSecretKey;
#[derive(FromArgs, PartialEq, Debug, Eq)] #[derive(FromArgs, PartialEq, Debug, Eq)]
/// Create a new user and api key /// Create a new user and api key
@ -25,12 +25,7 @@ pub struct CreateUserSubCommand {
/// the user's first api ULID or UUID key. /// the user's first api ULID or UUID key.
/// If none given, one will be created. /// If none given, one will be created.
#[argh(option)] #[argh(option)]
rpc_key: RpcApiKey, rpc_secret_key: RpcSecretKey,
/// the key's maximum requests per minute.
/// Default to "None" which the code sees as "unlimited" requests.
#[argh(option)]
rpm: Option<u64>,
/// an optional short description of the key's purpose. /// an optional short description of the key's purpose.
#[argh(option)] #[argh(option)]
@ -74,10 +69,9 @@ impl CreateUserSubCommand {
); );
// create a key for the new user // create a key for the new user
let uk = rpc_keys::ActiveModel { let uk = rpc_key::ActiveModel {
user_id: u.id, user_id: u.id,
rpc_key: sea_orm::Set(self.rpc_key.into()), secret_key: sea_orm::Set(self.rpc_secret_key.into()),
requests_per_minute: sea_orm::Set(self.rpm),
description: sea_orm::Set(self.description), description: sea_orm::Set(self.description),
..Default::default() ..Default::default()
}; };
@ -87,8 +81,8 @@ impl CreateUserSubCommand {
txn.commit().await?; txn.commit().await?;
info!("user key as ULID: {}", Ulid::from(self.rpc_key)); info!("user key as ULID: {}", Ulid::from(self.rpc_secret_key));
info!("user key as UUID: {}", Uuid::from(self.rpc_key)); info!("user key as UUID: {}", Uuid::from(self.rpc_secret_key));
Ok(()) Ok(())
} }

View File

@ -74,7 +74,7 @@ pub struct AppConfig {
/// Default request limit for registered users. /// Default request limit for registered users.
/// 0 = block all requests /// 0 = block all requests
/// None = allow all requests /// None = allow all requests
pub default_user_requests_per_minute: Option<u64>, pub default_user_max_requests_per_minute: Option<u64>,
/// Restrict user registration. /// Restrict user registration.
/// None = no code needed /// None = no code needed

View File

@ -10,12 +10,12 @@ use axum::headers::{Header, Origin, Referer, UserAgent};
use axum::TypedHeader; use axum::TypedHeader;
use chrono::Utc; use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult; use deferred_rate_limiter::DeferredRateLimitResult;
use entities::{rpc_keys, user}; use entities::{rpc_key, user, user_tier};
use http::HeaderValue; use http::HeaderValue;
use ipnet::IpNet; use ipnet::IpNet;
use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult; use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use std::fmt::Display; use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{net::IpAddr, str::FromStr, sync::Arc}; use std::{net::IpAddr, str::FromStr, sync::Arc};
@ -28,7 +28,7 @@ use uuid::Uuid;
/// This lets us use UUID and ULID while we transition to only ULIDs /// This lets us use UUID and ULID while we transition to only ULIDs
/// TODO: include the key's description. /// TODO: include the key's description.
#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] #[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
pub enum RpcApiKey { pub enum RpcSecretKey {
Ulid(Ulid), Ulid(Ulid),
Uuid(Uuid), Uuid(Uuid),
} }
@ -55,7 +55,7 @@ pub struct AuthorizedKey {
pub user_id: u64, pub user_id: u64,
pub rpc_key_id: u64, pub rpc_key_id: u64,
// TODO: just use an f32? even an f16 is probably fine // TODO: just use an f32? even an f16 is probably fine
pub log_revert_chance: Decimal, pub log_revert_chance: f64,
} }
#[derive(Debug)] #[derive(Debug)]
@ -107,19 +107,19 @@ impl RequestMetadata {
} }
} }
impl RpcApiKey { impl RpcSecretKey {
pub fn new() -> Self { pub fn new() -> Self {
Ulid::new().into() Ulid::new().into()
} }
} }
impl Default for RpcApiKey { impl Default for RpcSecretKey {
fn default() -> Self { fn default() -> Self {
Self::new() Self::new()
} }
} }
impl Display for RpcApiKey { impl Display for RpcSecretKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: do this without dereferencing // TODO: do this without dereferencing
let ulid: Ulid = (*self).into(); let ulid: Ulid = (*self).into();
@ -128,7 +128,7 @@ impl Display for RpcApiKey {
} }
} }
impl FromStr for RpcApiKey { impl FromStr for RpcSecretKey {
type Err = anyhow::Error; type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
@ -143,32 +143,32 @@ impl FromStr for RpcApiKey {
} }
} }
impl From<Ulid> for RpcApiKey { impl From<Ulid> for RpcSecretKey {
fn from(x: Ulid) -> Self { fn from(x: Ulid) -> Self {
RpcApiKey::Ulid(x) RpcSecretKey::Ulid(x)
} }
} }
impl From<Uuid> for RpcApiKey { impl From<Uuid> for RpcSecretKey {
fn from(x: Uuid) -> Self { fn from(x: Uuid) -> Self {
RpcApiKey::Uuid(x) RpcSecretKey::Uuid(x)
} }
} }
impl From<RpcApiKey> for Ulid { impl From<RpcSecretKey> for Ulid {
fn from(x: RpcApiKey) -> Self { fn from(x: RpcSecretKey) -> Self {
match x { match x {
RpcApiKey::Ulid(x) => x, RpcSecretKey::Ulid(x) => x,
RpcApiKey::Uuid(x) => Ulid::from(x.as_u128()), RpcSecretKey::Uuid(x) => Ulid::from(x.as_u128()),
} }
} }
} }
impl From<RpcApiKey> for Uuid { impl From<RpcSecretKey> for Uuid {
fn from(x: RpcApiKey) -> Self { fn from(x: RpcSecretKey) -> Self {
match x { match x {
RpcApiKey::Ulid(x) => Uuid::from_u128(x.0), RpcSecretKey::Ulid(x) => Uuid::from_u128(x.0),
RpcApiKey::Uuid(x) => x, RpcSecretKey::Uuid(x) => x,
} }
} }
} }
@ -297,7 +297,7 @@ pub async fn ip_is_authorized(
pub async fn key_is_authorized( pub async fn key_is_authorized(
app: &Web3ProxyApp, app: &Web3ProxyApp,
rpc_key: RpcApiKey, rpc_key: RpcSecretKey,
ip: IpAddr, ip: IpAddr,
origin: Option<Origin>, origin: Option<Origin>,
referer: Option<Referer>, referer: Option<Referer>,
@ -502,27 +502,32 @@ impl Web3ProxyApp {
// check the local cache for user data, or query the database // check the local cache for user data, or query the database
#[instrument(level = "trace")] #[instrument(level = "trace")]
pub(crate) async fn user_data(&self, rpc_key: RpcApiKey) -> anyhow::Result<UserKeyData> { pub(crate) async fn user_data(
&self,
rpc_secret_key: RpcSecretKey,
) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self let user_data: Result<_, Arc<anyhow::Error>> = self
.rpc_key_cache .rpc_secret_key_cache
.try_get_with(rpc_key.into(), async move { .try_get_with(rpc_secret_key.into(), async move {
trace!(?rpc_key, "user_cache miss"); trace!(?rpc_secret_key, "user cache miss");
let db_conn = self.db_conn().context("Getting database connection")?; let db_conn = self.db_conn().context("Getting database connection")?;
let rpc_key: Uuid = rpc_key.into(); let rpc_secret_key: Uuid = rpc_secret_key.into();
// TODO: join the user table to this to return the User? we don't always need it // TODO: join the user table to this to return the User? we don't always need it
// TODO: also attach secondary users // TODO: join on secondary users
match rpc_keys::Entity::find() // TODO: join on user tier
.filter(rpc_keys::Column::RpcKey.eq(rpc_key)) match rpc_key::Entity::find()
.filter(rpc_keys::Column::Active.eq(true)) .filter(rpc_key::Column::SecretKey.eq(rpc_secret_key))
.filter(rpc_key::Column::Active.eq(true))
.one(&db_conn) .one(&db_conn)
.await? .await?
{ {
Some(rpc_key_model) => { Some(rpc_key_model) => {
// TODO: move these splits into helper functions // TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us? // TODO: can we have sea orm handle this for us?
// let user_tier_model = rpc_key_model.
let allowed_ips: Option<Vec<IpNet>> = let allowed_ips: Option<Vec<IpNet>> =
if let Some(allowed_ips) = rpc_key_model.allowed_ips { if let Some(allowed_ips) = rpc_key_model.allowed_ips {
@ -575,16 +580,18 @@ impl Web3ProxyApp {
None None
}; };
// let user_tier_model = user_tier
Ok(UserKeyData { Ok(UserKeyData {
user_id: rpc_key_model.user_id, user_id: rpc_key_model.user_id,
rpc_key_id: rpc_key_model.id, rpc_key_id: rpc_key_model.id,
max_requests_per_period: rpc_key_model.requests_per_minute,
max_concurrent_requests: rpc_key_model.max_concurrent_requests,
allowed_ips, allowed_ips,
allowed_origins, allowed_origins,
allowed_referers, allowed_referers,
allowed_user_agents, allowed_user_agents,
log_revert_chance: rpc_key_model.log_revert_chance, log_revert_chance: rpc_key_model.log_revert_chance,
max_concurrent_requests: None, // todo! user_tier_model.max_concurrent_requests,
max_requests_per_period: None, // todo! user_tier_model.max_requests_per_period,
}) })
} }
None => Ok(UserKeyData::default()), None => Ok(UserKeyData::default()),
@ -597,7 +604,10 @@ impl Web3ProxyApp {
} }
#[instrument(level = "trace")] #[instrument(level = "trace")]
pub async fn rate_limit_by_key(&self, rpc_key: RpcApiKey) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_by_key(
&self,
rpc_key: RpcSecretKey,
) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(rpc_key).await?; let user_data = self.user_data(rpc_key).await?;
if user_data.rpc_key_id == 0 { if user_data.rpc_key_id == 0 {

View File

@ -39,14 +39,14 @@ pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl In
#[instrument(level = "trace")] #[instrument(level = "trace")]
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.pending_transactions.sync(); app.pending_transactions.sync();
app.rpc_key_cache.sync(); app.rpc_secret_key_cache.sync();
// TODO: what else should we include? uptime, cache hit rates, cpu load // TODO: what else should we include? uptime, cache hit rates, cpu load
let body = json!({ let body = json!({
"pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(), "pending_transactions_size": app.pending_transactions.weighted_size(),
"user_cache_count": app.rpc_key_cache.entry_count(), "user_cache_count": app.rpc_secret_key_cache.entry_count(),
"user_cache_size": app.rpc_key_cache.weighted_size(), "user_cache_size": app.rpc_secret_key_cache.weighted_size(),
"balanced_rpcs": app.balanced_rpcs, "balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs, "private_rpcs": app.private_rpcs,
}); });

View File

@ -1,6 +1,6 @@
//! Handle registration, logins, and managing account data. //! Handle registration, logins, and managing account data.
use super::authorization::{login_is_authorized, RpcApiKey}; use super::authorization::{login_is_authorized, RpcSecretKey};
use super::errors::FrontendResult; use super::errors::FrontendResult;
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
use crate::user_queries::{ use crate::user_queries::{
@ -18,7 +18,7 @@ use axum::{
}; };
use axum_client_ip::ClientIp; use axum_client_ip::ClientIp;
use axum_macros::debug_handler; use axum_macros::debug_handler;
use entities::{revert_logs, rpc_keys, user}; use entities::{revert_log, rpc_key, user};
use ethers::{prelude::Address, types::Bytes}; use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap; use hashbrown::HashMap;
use http::{HeaderValue, StatusCode}; use http::{HeaderValue, StatusCode};
@ -250,6 +250,8 @@ pub async fn user_login_post(
// the only thing we need from them is an address // the only thing we need from them is an address
// everything else is optional // everything else is optional
// TODO: different invite codes should allow different levels
// TODO: maybe decrement a count on the invite code?
let u = user::ActiveModel { let u = user::ActiveModel {
address: sea_orm::Set(our_msg.address.into()), address: sea_orm::Set(our_msg.address.into()),
..Default::default() ..Default::default()
@ -259,14 +261,13 @@ pub async fn user_login_post(
// create the user's first api key // create the user's first api key
// TODO: rename to UserApiKey? RpcApiKey? // TODO: rename to UserApiKey? RpcApiKey?
let rpc_key = RpcApiKey::new(); let rpc_secret_key = RpcSecretKey::new();
// TODO: variable requests per minute depending on the invite code // TODO: variable requests per minute depending on the invite code
let uk = rpc_keys::ActiveModel { let uk = rpc_key::ActiveModel {
user_id: sea_orm::Set(u.id), user_id: sea_orm::Set(u.id),
rpc_key: sea_orm::Set(rpc_key.into()), secret_key: sea_orm::Set(rpc_secret_key.into()),
description: sea_orm::Set(Some("first".to_string())), description: sea_orm::Set(Some("first".to_string())),
requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute),
..Default::default() ..Default::default()
}; };
@ -284,8 +285,8 @@ pub async fn user_login_post(
} }
Some(u) => { Some(u) => {
// the user is already registered // the user is already registered
let uks = rpc_keys::Entity::find() let uks = rpc_key::Entity::find()
.filter(rpc_keys::Column::UserId.eq(u.id)) .filter(rpc_key::Column::UserId.eq(u.id))
.all(&db_conn) .all(&db_conn)
.await .await
.context("failed loading user's key")?; .context("failed loading user's key")?;
@ -460,8 +461,8 @@ pub async fn rpc_keys_get(
let db_conn = app.db_conn().context("getting db to fetch user's keys")?; let db_conn = app.db_conn().context("getting db to fetch user's keys")?;
let uks = rpc_keys::Entity::find() let uks = rpc_key::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id)) .filter(rpc_key::Column::UserId.eq(user.id))
.all(&db_conn) .all(&db_conn)
.await .await
.context("failed loading user's key")?; .context("failed loading user's key")?;
@ -525,9 +526,9 @@ pub async fn rpc_keys_management(
let mut uk = if let Some(existing_key_id) = payload.key_id { let mut uk = if let Some(existing_key_id) = payload.key_id {
// get the key and make sure it belongs to the user // get the key and make sure it belongs to the user
let uk = rpc_keys::Entity::find() let uk = rpc_key::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id)) .filter(rpc_key::Column::UserId.eq(user.id))
.filter(rpc_keys::Column::Id.eq(existing_key_id)) .filter(rpc_key::Column::Id.eq(existing_key_id))
.one(&db_conn) .one(&db_conn)
.await .await
.context("failed loading user's key")? .context("failed loading user's key")?
@ -537,12 +538,11 @@ pub async fn rpc_keys_management(
} else { } else {
// make a new key // make a new key
// TODO: limit to 10 keys? // TODO: limit to 10 keys?
let rpc_key = RpcApiKey::new(); let secret_key = RpcSecretKey::new();
rpc_keys::ActiveModel { rpc_key::ActiveModel {
user_id: sea_orm::Set(user.id), user_id: sea_orm::Set(user.id),
rpc_key: sea_orm::Set(rpc_key.into()), secret_key: sea_orm::Set(secret_key.into()),
requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute),
..Default::default() ..Default::default()
} }
}; };
@ -671,7 +671,7 @@ pub async fn rpc_keys_management(
uk uk
}; };
let uk: rpc_keys::Model = uk.try_into()?; let uk: rpc_key::Model = uk.try_into()?;
Ok(Json(uk).into_response()) Ok(Json(uk).into_response())
} }
@ -702,8 +702,8 @@ pub async fn user_revert_logs_get(
let db_conn = app.db_conn().context("getting db for user's revert logs")?; let db_conn = app.db_conn().context("getting db for user's revert logs")?;
let uks = rpc_keys::Entity::find() let uks = rpc_key::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id)) .filter(rpc_key::Column::UserId.eq(user.id))
.all(&db_conn) .all(&db_conn)
.await .await
.context("failed loading user's key")?; .context("failed loading user's key")?;
@ -712,17 +712,17 @@ pub async fn user_revert_logs_get(
let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect();
// get paginated logs // get paginated logs
let q = revert_logs::Entity::find() let q = revert_log::Entity::find()
.filter(revert_logs::Column::Timestamp.gte(query_start)) .filter(revert_log::Column::Timestamp.gte(query_start))
.filter(revert_logs::Column::RpcKeyId.is_in(uks)) .filter(revert_log::Column::RpcKeyId.is_in(uks))
.order_by_asc(revert_logs::Column::Timestamp); .order_by_asc(revert_log::Column::Timestamp);
let q = if chain_id == 0 { let q = if chain_id == 0 {
// don't do anything // don't do anything
q q
} else { } else {
// filter on chain id // filter on chain id
q.filter(revert_logs::Column::ChainId.eq(chain_id)) q.filter(revert_log::Column::ChainId.eq(chain_id))
}; };
let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?; let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?;

View File

@ -402,7 +402,7 @@ impl Web3Connections {
// if we get here, something is wrong. clear synced connections // if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default(); let empty_synced_connections = SyncedConnections::default();
let old_synced_connections = self let _ = self
.synced_connections .synced_connections
.swap(Arc::new(empty_synced_connections)); .swap(Arc::new(empty_synced_connections));

View File

@ -4,7 +4,7 @@ use crate::frontend::authorization::AuthorizedRequest;
use crate::metered::{JsonRpcErrorCount, ProviderErrorCount}; use crate::metered::{JsonRpcErrorCount, ProviderErrorCount};
use anyhow::Context; use anyhow::Context;
use chrono::Utc; use chrono::Utc;
use entities::revert_logs; use entities::revert_log;
use entities::sea_orm_active_enums::Method; use entities::sea_orm_active_enums::Method;
use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use ethers::types::{Address, Bytes}; use ethers::types::{Address, Bytes};
@ -12,9 +12,7 @@ use metered::metered;
use metered::HitCount; use metered::HitCount;
use metered::ResponseTime; use metered::ResponseTime;
use metered::Throughput; use metered::Throughput;
use num_traits::cast::FromPrimitive;
use rand::Rng; use rand::Rng;
use sea_orm::prelude::Decimal;
use sea_orm::ActiveEnum; use sea_orm::ActiveEnum;
use sea_orm::ActiveModelTrait; use sea_orm::ActiveModelTrait;
use serde_json::json; use serde_json::json;
@ -94,7 +92,7 @@ impl AuthorizedRequest {
.expect("address should always convert to a Vec<u8>"); .expect("address should always convert to a Vec<u8>");
let call_data = params.data.map(|x| format!("{}", x)); let call_data = params.data.map(|x| format!("{}", x));
let rl = revert_logs::ActiveModel { let rl = revert_log::ActiveModel {
rpc_key_id: sea_orm::Set(authorized_request.rpc_key_id), rpc_key_id: sea_orm::Set(authorized_request.rpc_key_id),
method: sea_orm::Set(method), method: sea_orm::Set(method),
to: sea_orm::Set(to), to: sea_orm::Set(to),
@ -222,16 +220,13 @@ impl OpenRequestHandle {
} else { } else {
let log_revert_chance = y.log_revert_chance; let log_revert_chance = y.log_revert_chance;
if log_revert_chance.is_zero() { if log_revert_chance == 0.0 {
trace!(%method, "no chance. skipping save on revert"); trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::DebugLevel RequestErrorHandler::DebugLevel
} else if log_revert_chance == Decimal::ONE { } else if log_revert_chance == 1.0 {
trace!(%method, "gaurenteed chance. SAVING on revert"); trace!(%method, "gaurenteed chance. SAVING on revert");
error_handler error_handler
} else if Decimal::from_f32(rand::thread_rng().gen_range(0.0f32..=1.0)) } else if rand::thread_rng().gen_range(0.0f64..=1.0) > log_revert_chance {
.expect("f32 should always convert to a Decimal")
> log_revert_chance
{
trace!(%method, "missed chance. skipping save on revert"); trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::DebugLevel RequestErrorHandler::DebugLevel
} else { } else {

View File

@ -4,7 +4,7 @@ use axum::{
TypedHeader, TypedHeader,
}; };
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use entities::{rpc_accounting, rpc_keys}; use entities::{rpc_accounting, rpc_key};
use hashbrown::HashMap; use hashbrown::HashMap;
use migration::Expr; use migration::Expr;
use num::Zero; use num::Zero;
@ -272,11 +272,11 @@ pub async fn get_aggregate_rpc_stats_from_params(
// TODO: are these joins correct? // TODO: are these joins correct?
// TODO: what about keys where they are the secondary users? // TODO: what about keys where they are the secondary users?
let q = q let q = q
.join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKeys.def()) .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def())
.column(rpc_keys::Column::UserId) .column(rpc_key::Column::UserId)
.group_by(rpc_keys::Column::UserId); .group_by(rpc_key::Column::UserId);
let condition = condition.add(rpc_keys::Column::UserId.eq(user_id)); let condition = condition.add(rpc_key::Column::UserId.eq(user_id));
(condition, q) (condition, q)
}; };
@ -394,21 +394,20 @@ pub async fn get_detailed_stats(
// TODO: move authentication here? // TODO: move authentication here?
// TODO: what about keys where this user is a secondary user? // TODO: what about keys where this user is a secondary user?
let q = q let q = q
.join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKeys.def()) .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def())
.column(rpc_keys::Column::UserId) .column(rpc_key::Column::UserId)
.group_by(rpc_keys::Column::UserId); .group_by(rpc_key::Column::UserId);
let condition = condition.add(rpc_keys::Column::UserId.eq(user_id)); let condition = condition.add(rpc_key::Column::UserId.eq(user_id));
let q = if rpc_key_id == 0 { let q = if rpc_key_id == 0 {
q.column(rpc_keys::Column::UserId) q.column(rpc_key::Column::UserId)
.group_by(rpc_keys::Column::UserId) .group_by(rpc_key::Column::UserId)
} else { } else {
response.insert("rpc_key_id", serde_json::to_value(rpc_key_id)?); response.insert("rpc_key_id", serde_json::to_value(rpc_key_id)?);
// no need to group_by user_id when we are grouping by key_id // no need to group_by user_id when we are grouping by key_id
q.column(rpc_keys::Column::Id) q.column(rpc_key::Column::Id).group_by(rpc_key::Column::Id)
.group_by(rpc_keys::Column::Id)
}; };
(condition, q) (condition, q)