From 366f2c8f848eeba4d8d0defba72abe47732970d1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 27 Oct 2022 00:12:42 +0000 Subject: [PATCH] rename api_key to rpc_key --- Cargo.lock | 6 +- TODO.md | 11 ++- entities/Cargo.toml | 2 +- entities/src/mod.rs | 2 +- entities/src/prelude.rs | 2 +- entities/src/revert_logs.rs | 10 +- entities/src/rpc_accounting.rs | 14 +-- entities/src/{user_keys.rs => rpc_keys.rs} | 4 +- entities/src/user.rs | 8 +- migration/Cargo.toml | 2 +- migration/src/lib.rs | 2 + .../src/m20220101_000001_create_table.rs | 4 +- .../src/m20221026_230819_rename_user_keys.rs | 49 ++++++++++ redis-rate-limiter/src/lib.rs | 2 +- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app.rs | 17 ++-- web3_proxy/src/app_stats.rs | 12 +-- .../src/bin/web3_proxy_cli/create_user.rs | 12 +-- web3_proxy/src/frontend/authorization.rs | 94 ++++++++++--------- web3_proxy/src/frontend/mod.rs | 8 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 8 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 6 +- web3_proxy/src/frontend/status.rs | 8 +- web3_proxy/src/frontend/users.rs | 54 +++++------ web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/request.rs | 2 +- web3_proxy/src/user_queries.rs | 48 +++++----- 27 files changed, 220 insertions(+), 171 deletions(-) rename entities/src/{user_keys.rs => rpc_keys.rs} (96%) create mode 100644 migration/src/m20221026_230819_rename_user_keys.rs diff --git a/Cargo.lock b/Cargo.lock index 53e207a6..a93f99a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,7 +1348,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.5.0" +version = "0.6.0" dependencies = [ "sea-orm", "serde", @@ -2657,7 +2657,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.5.0" +version = "0.6.0" dependencies = [ "sea-orm-migration", "tokio", @@ -5526,7 +5526,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.5.0" +version = "0.6.0" dependencies = [ "anyhow", "arc-swap", diff --git a/TODO.md b/TODO.md index fcc7f817..103372fa 100644 --- a/TODO.md +++ b/TODO.md @@ -195,8 +195,10 @@ These are roughly in order of completition - [x] send logs to sentry - [x] login should return the user id - [x] when we show keys, also show the key's id +- [x] add config for concurrent requests from public requests - [x] new endpoints for users (not totally sure about the exact paths, but these features are all needed): - [x] sign in + - [x] login should include the key id, not just the key ULID - [x] sign out - [x] GET profile endpoint - [x] POST profile endpoint @@ -208,13 +210,13 @@ These are roughly in order of completition - [x] generate a new key from a web endpoint - [x] modifying key settings such as private relay, revert logging, ip/origin/etc checks - [x] GET logged reverts on an endpoint that **requires authentication**. -- [ ] add config for concurrent requests from public requests +- [ ] rename user_key to rpc_key + - [x] in code + - [ ] in database with a migration - [ ] document url params with examples -- [ ] display concurrent requests per api key (only with authentication!) -- [ ] endpoint for creating/modifying api keys and their advanced security features +- [ ] instead of requests_per_minute on every key, have a "user_tier" that gets joined - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests -- [ ] login should include the key id, not just the key ULID - [-] add configurable size limits to all the Caches - [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly @@ -237,6 +239,7 @@ These are roughly in order of completition - might have to use Uuid in sea-orm and then convert to Ulid on display - [ ] add pruning or aggregating or something to log revert trace. otherwise our databases are going to grow really big - [ ] after adding this, allow posting to /user/keys to turn on revert logging +- [ ] display concurrent requests per api key (only with authentication!) ## V1 diff --git a/entities/Cargo.toml b/entities/Cargo.toml index b35a7976..7e4af734 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.5.0" +version = "0.6.0" edition = "2021" [lib] diff --git a/entities/src/mod.rs b/entities/src/mod.rs index 95b696f2..fbcc2c1f 100644 --- a/entities/src/mod.rs +++ b/entities/src/mod.rs @@ -7,4 +7,4 @@ pub mod rpc_accounting; pub mod sea_orm_active_enums; pub mod secondary_user; pub mod user; -pub mod user_keys; +pub mod rpc_keys; diff --git a/entities/src/prelude.rs b/entities/src/prelude.rs index 03a1bcc6..ae61d8d4 100644 --- a/entities/src/prelude.rs +++ b/entities/src/prelude.rs @@ -2,6 +2,6 @@ pub use super::revert_logs::Entity as RevertLogs; pub use super::rpc_accounting::Entity as RpcAccounting; +pub use super::rpc_keys::Entity as UserKeys; pub use super::secondary_user::Entity as SecondaryUser; pub use super::user::Entity as User; -pub use super::user_keys::Entity as UserKeys; diff --git a/entities/src/revert_logs.rs b/entities/src/revert_logs.rs index 2cc2454e..3e323650 100644 --- a/entities/src/revert_logs.rs +++ b/entities/src/revert_logs.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: u64, - pub user_key_id: u64, + pub rpc_key_id: u64, pub timestamp: DateTimeUtc, pub method: Method, pub to: Vec, @@ -21,16 +21,16 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { #[sea_orm( - belongs_to = "super::user_keys::Entity", - from = "Column::UserKeyId", - to = "super::user_keys::Column::Id", + belongs_to = "super::rpc_keys::Entity", + from = "Column::RpcKeyId", + to = "super::rpc_keys::Column::Id", on_update = "NoAction", on_delete = "NoAction" )] UserKeys, } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { Relation::UserKeys.def() } diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index 2e239654..5f57adc4 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: u64, - pub user_key_id: u64, + pub rpc_key_id: u64, pub chain_id: u64, pub method: String, pub error_response: bool, @@ -47,18 +47,18 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { #[sea_orm( - belongs_to = "super::user_keys::Entity", - from = "Column::UserKeyId", - to = "super::user_keys::Column::Id", + belongs_to = "super::rpc_keys::Entity", + from = "Column::RpcKeyId", + to = "super::rpc_keys::Column::Id", on_update = "NoAction", on_delete = "NoAction" )] - UserKeys, + RpcKeys, } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::UserKeys.def() + Relation::RpcKeys.def() } } diff --git a/entities/src/user_keys.rs b/entities/src/rpc_keys.rs similarity index 96% rename from entities/src/user_keys.rs rename to entities/src/rpc_keys.rs index dfd57bf0..dc4bb5d1 100644 --- a/entities/src/user_keys.rs +++ b/entities/src/rpc_keys.rs @@ -4,13 +4,13 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] -#[sea_orm(table_name = "user_keys")] +#[sea_orm(table_name = "rpc_keys")] pub struct Model { #[sea_orm(primary_key)] pub id: u64, pub user_id: u64, #[sea_orm(unique)] - pub api_key: Uuid, + pub rpc_key: Uuid, pub description: Option, pub private_txs: bool, pub active: bool, diff --git a/entities/src/user.rs b/entities/src/user.rs index b468db4b..a57dd584 100644 --- a/entities/src/user.rs +++ b/entities/src/user.rs @@ -18,8 +18,8 @@ pub struct Model { pub enum Relation { #[sea_orm(has_many = "super::secondary_user::Entity")] SecondaryUser, - #[sea_orm(has_many = "super::user_keys::Entity")] - UserKeys, + #[sea_orm(has_many = "super::rpc_keys::Entity")] + RpcKeys, } impl Related for Entity { @@ -28,9 +28,9 @@ impl Related for Entity { } } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::UserKeys.def() + Relation::RpcKeys.def() } } diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 8c89f6bb..90283714 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.5.0" +version = "0.6.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 8a1b535b..6a3aab63 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -5,6 +5,7 @@ mod m20220921_181610_log_reverts; mod m20220928_015108_concurrency_limits; mod m20221007_213828_accounting; mod m20221025_210326_add_chain_id_to_reverts; +mod m20221026_230819_rename_user_keys; pub struct Migrator; @@ -17,6 +18,7 @@ impl MigratorTrait for Migrator { Box::new(m20220928_015108_concurrency_limits::Migration), Box::new(m20221007_213828_accounting::Migration), Box::new(m20221025_210326_add_chain_id_to_reverts::Migration), + Box::new(m20221026_230819_rename_user_keys::Migration), ] } } diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs index a0e4261a..d1c3636c 100644 --- a/migration/src/m20220101_000001_create_table.rs +++ b/migration/src/m20220101_000001_create_table.rs @@ -174,8 +174,8 @@ enum SecondaryUser { /* -- TODO: foreign keys --- TODO: index on api_key --- TODO: what size for api_key +-- TODO: index on rpc_key +-- TODO: what size for rpc_key -- TODO: track active with a timestamp? -- TODO: creation time? -- TODO: requests_per_day? requests_per_second?, diff --git a/migration/src/m20221026_230819_rename_user_keys.rs b/migration/src/m20221026_230819_rename_user_keys.rs new file mode 100644 index 00000000..95599ef5 --- /dev/null +++ b/migration/src/m20221026_230819_rename_user_keys.rs @@ -0,0 +1,49 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .rename_table( + Table::rename() + .table(Alias::new("user_keys"), Alias::new("rpc_keys")) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(Alias::new("rpc_keys")) + .rename_column(Alias::new("api_key"), Alias::new("rpc_key")) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("rpc_keys")) + .rename_column(Alias::new("rpc_key"), Alias::new("api_key")) + .to_owned(), + ) + .await?; + + manager + .rename_table( + Table::rename() + .table(Alias::new("rpc_keys"), Alias::new("user_keys")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index 4974084f..6f9341f4 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -64,7 +64,7 @@ impl RedisRateLimiter { Instant::now().add(Duration::from_secs_f32(seconds_left_in_period)) } - /// label might be an ip address or a user_key id. + /// label might be an ip address or a rpc_key id. /// if setting max_per_period, be sure to keep the period the same for all requests to this label pub async fn throttle_label( &self, diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 0ca81f9c..b54385bc 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.5.0" +version = "0.6.0" edition = "2021" default-run = "web3_proxy" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 6424afed..00553d9a 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -63,12 +63,11 @@ type ResponseCache = pub type AnyhowJoinHandle = JoinHandle>; #[derive(Clone, Debug, Default, From)] -/// TODO: rename this? pub struct UserKeyData { /// database id of the primary user pub user_id: u64, - /// database id of the api key - pub user_key_id: u64, + /// database id of the rpc key + pub rpc_key_id: u64, /// if None, allow unlimited queries pub max_requests_per_period: Option, // if None, allow unlimited concurrent requests @@ -109,8 +108,8 @@ pub struct Web3ProxyApp { pub frontend_key_rate_limiter: Option>, pub login_rate_limiter: Option, pub vredis_pool: Option, - pub user_key_cache: Cache, - pub user_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub rpc_key_cache: Cache, + pub rpc_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub bearer_token_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, @@ -449,7 +448,7 @@ impl Web3ProxyApp { // if there is no database of users, there will be no keys and so this will be empty // TODO: max_capacity from config // TODO: ttl from config - let user_key_cache = Cache::builder() + let rpc_key_cache = Cache::builder() .max_capacity(10_000) .time_to_live(Duration::from_secs(60)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); @@ -462,7 +461,7 @@ impl Web3ProxyApp { let ip_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); - let user_key_semaphores = Cache::builder() + let rpc_key_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); @@ -481,10 +480,10 @@ impl Web3ProxyApp { vredis_pool, app_metrics, open_request_handle_metrics, - user_key_cache, + rpc_key_cache, bearer_token_semaphores, ip_semaphores, - user_key_semaphores, + rpc_key_semaphores, stat_sender, }; diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index d97eab02..303dc671 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -18,7 +18,7 @@ use tracing::{error, info, trace}; /// TODO: can we use something inside sea_orm instead? #[derive(Debug)] pub struct ProxyResponseStat { - user_key_id: u64, + rpc_key_id: u64, method: String, period_seconds: u64, period_timestamp: u64, @@ -56,7 +56,7 @@ impl Default for ProxyResponseHistograms { // TODO: impl From for our database model pub struct ProxyResponseAggregate { // these are the key - // user_key_id: u64, + // rpc_key_id: u64, // method: String, // error_response: bool, // TODO: this is the grandparent key. get it from there somehow @@ -75,7 +75,7 @@ pub struct ProxyResponseAggregate { #[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct UserProxyResponseKey { - user_key_id: u64, + rpc_key_id: u64, method: String, error_response: bool, } @@ -128,7 +128,7 @@ impl ProxyResponseStat { let response_millis = metadata.start_instant.elapsed().as_millis() as u64; Self { - user_key_id: authorized_key.user_key_id, + rpc_key_id: authorized_key.rpc_key_id, method, backend_requests, period_seconds, @@ -286,7 +286,7 @@ impl StatEmitter { let stat = rpc_accounting::ActiveModel { id: sea_orm::NotSet, - user_key_id: sea_orm::Set(k.user_key_id), + rpc_key_id: sea_orm::Set(k.rpc_key_id), chain_id: sea_orm::Set(self.chain_id), method: sea_orm::Set(k.method.clone()), error_response: sea_orm::Set(k.error_response), @@ -356,7 +356,7 @@ impl StatEmitter { }) .await; - let key = (stat.user_key_id, stat.method, stat.error_response).into(); + let key = (stat.rpc_key_id, stat.method, stat.error_response).into(); let user_aggregate = user_cache .get_with(key, async move { diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index 30322cd8..ede684dd 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -1,6 +1,6 @@ use anyhow::Context; use argh::FromArgs; -use entities::{user, user_keys}; +use entities::{rpc_keys, user}; use ethers::prelude::Address; use sea_orm::{ActiveModelTrait, TransactionTrait}; use tracing::info; @@ -25,7 +25,7 @@ pub struct CreateUserSubCommand { /// the user's first api ULID or UUID key. /// If none given, one will be created. #[argh(option)] - api_key: RpcApiKey, + rpc_key: RpcApiKey, /// the key's maximum requests per minute. /// Default to "None" which the code sees as "unlimited" requests. @@ -74,9 +74,9 @@ impl CreateUserSubCommand { ); // create a key for the new user - let uk = user_keys::ActiveModel { + let uk = rpc_keys::ActiveModel { user_id: u.id, - api_key: sea_orm::Set(self.api_key.into()), + rpc_key: sea_orm::Set(self.rpc_key.into()), requests_per_minute: sea_orm::Set(self.rpm), description: sea_orm::Set(self.description), ..Default::default() @@ -87,8 +87,8 @@ impl CreateUserSubCommand { txn.commit().await?; - info!("user key as ULID: {}", Ulid::from(self.api_key)); - info!("user key as UUID: {}", Uuid::from(self.api_key)); + info!("user key as ULID: {}", Ulid::from(self.rpc_key)); + info!("user key as UUID: {}", Uuid::from(self.rpc_key)); Ok(()) } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 9c4a7d91..be4dce99 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -9,7 +9,7 @@ use axum::headers::{Header, Origin, Referer, UserAgent}; use axum::TypedHeader; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; -use entities::{user, user_keys}; +use entities::{rpc_keys, user}; use http::HeaderValue; use ipnet::IpNet; use redis_rate_limiter::redis::AsyncCommands; @@ -37,11 +37,11 @@ pub enum RateLimitResult { /// contains the IP of the anonymous user /// TODO: option inside or outside the arc? AllowedIp(IpAddr, Option), - /// contains the user_key_id of an authenticated user + /// contains the rpc_key_id of an authenticated user AllowedUser(UserKeyData, Option), /// contains the IP and retry_at of the anonymous user RateLimitedIp(IpAddr, Option), - /// contains the user_key_id and retry_at of an authenticated user key + /// contains the rpc_key_id and retry_at of an authenticated user key RateLimitedUser(UserKeyData, Option), /// This key is not in our database. Deny access! UnknownKey, @@ -52,7 +52,7 @@ pub struct AuthorizedKey { pub ip: IpAddr, pub origin: Option, pub user_id: u64, - pub user_key_id: u64, + pub rpc_key_id: u64, // TODO: just use an f32? even an f16 is probably fine pub log_revert_chance: Decimal, } @@ -178,10 +178,10 @@ impl AuthorizedKey { origin: Option, referer: Option, user_agent: Option, - user_key_data: UserKeyData, + rpc_key_data: UserKeyData, ) -> anyhow::Result { // check ip - match &user_key_data.allowed_ips { + match &rpc_key_data.allowed_ips { None => {} Some(allowed_ips) => { if !allowed_ips.iter().any(|x| x.contains(&ip)) { @@ -191,7 +191,7 @@ impl AuthorizedKey { } // check origin - match (&origin, &user_key_data.allowed_origins) { + match (&origin, &rpc_key_data.allowed_origins) { (None, None) => {} (Some(_), None) => {} (None, Some(_)) => return Err(anyhow::anyhow!("Origin required")), @@ -203,7 +203,7 @@ impl AuthorizedKey { } // check referer - match (referer, &user_key_data.allowed_referers) { + match (referer, &rpc_key_data.allowed_referers) { (None, None) => {} (Some(_), None) => {} (None, Some(_)) => return Err(anyhow::anyhow!("Referer required")), @@ -215,7 +215,7 @@ impl AuthorizedKey { } // check user_agent - match (user_agent, &user_key_data.allowed_user_agents) { + match (user_agent, &rpc_key_data.allowed_user_agents) { (None, None) => {} (Some(_), None) => {} (None, Some(_)) => return Err(anyhow::anyhow!("User agent required")), @@ -229,9 +229,9 @@ impl AuthorizedKey { Ok(Self { ip, origin, - user_id: user_key_data.user_id, - user_key_id: user_key_data.user_key_id, - log_revert_chance: user_key_data.log_revert_chance, + user_id: rpc_key_data.user_id, + rpc_key_id: rpc_key_data.rpc_key_id, + log_revert_chance: rpc_key_data.log_revert_chance, }) } } @@ -251,7 +251,7 @@ impl Display for &AuthorizedRequest { match self { AuthorizedRequest::Internal => f.write_str("int"), AuthorizedRequest::Ip(x, _) => f.write_str(&format!("ip-{}", x)), - AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.user_key_id)), + AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.rpc_key_id)), } } } @@ -296,14 +296,14 @@ pub async fn ip_is_authorized( pub async fn key_is_authorized( app: &Web3ProxyApp, - user_key: RpcApiKey, + rpc_key: RpcApiKey, ip: IpAddr, origin: Option, referer: Option, user_agent: Option, ) -> Result<(AuthorizedRequest, Option), FrontendErrorResponse> { // check the rate limits. error if over the limit - let (user_data, semaphore) = match app.rate_limit_by_key(user_key).await? { + let (user_data, semaphore) = match app.rate_limit_by_key(rpc_key).await? { RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore), RateLimitResult::RateLimitedUser(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedUser(x, retry_at)); @@ -321,6 +321,7 @@ pub async fn key_is_authorized( } impl Web3ProxyApp { + /// Limit the number of concurrent requests from the given ip address. pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result> { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self @@ -345,16 +346,17 @@ impl Web3ProxyApp { } } - pub async fn user_key_semaphore( + /// Limit the number of concurrent requests from the given key address. + pub async fn user_rpc_key_semaphore( &self, - user_data: &UserKeyData, + rpc_key_data: &UserKeyData, ) -> anyhow::Result> { - if let Some(max_concurrent_requests) = user_data.max_concurrent_requests { + if let Some(max_concurrent_requests) = rpc_key_data.max_concurrent_requests { let semaphore = self - .user_key_semaphores - .get_with(user_data.user_key_id, async move { + .rpc_key_semaphores + .get_with(rpc_key_data.rpc_key_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - trace!("new semaphore for user_key_id {}", user_data.user_key_id); + trace!("new semaphore for rpc_key_id {}", rpc_key_data.rpc_key_id); Arc::new(s) }) .await; @@ -493,30 +495,30 @@ impl Web3ProxyApp { } // check the local cache for user data, or query the database - pub(crate) async fn user_data(&self, user_key: RpcApiKey) -> anyhow::Result { + pub(crate) async fn user_data(&self, rpc_key: RpcApiKey) -> anyhow::Result { let user_data: Result<_, Arc> = self - .user_key_cache - .try_get_with(user_key.into(), async move { - trace!(?user_key, "user_cache miss"); + .rpc_key_cache + .try_get_with(rpc_key.into(), async move { + trace!(?rpc_key, "user_cache miss"); let db_conn = self.db_conn().context("Getting database connection")?; - let user_uuid: Uuid = user_key.into(); + let rpc_key: Uuid = rpc_key.into(); // TODO: join the user table to this to return the User? we don't always need it // TODO: also attach secondary users - match user_keys::Entity::find() - .filter(user_keys::Column::ApiKey.eq(user_uuid)) - .filter(user_keys::Column::Active.eq(true)) + match rpc_keys::Entity::find() + .filter(rpc_keys::Column::RpcKey.eq(rpc_key)) + .filter(rpc_keys::Column::Active.eq(true)) .one(&db_conn) .await? { - Some(user_key_model) => { + Some(rpc_key_model) => { // TODO: move these splits into helper functions // TODO: can we have sea orm handle this for us? let allowed_ips: Option> = - if let Some(allowed_ips) = user_key_model.allowed_ips { + if let Some(allowed_ips) = rpc_key_model.allowed_ips { let x = allowed_ips .split(',') .map(|x| x.parse::()) @@ -527,7 +529,7 @@ impl Web3ProxyApp { }; let allowed_origins: Option> = - if let Some(allowed_origins) = user_key_model.allowed_origins { + if let Some(allowed_origins) = rpc_key_model.allowed_origins { // TODO: do this without collecting twice? let x = allowed_origins .split(',') @@ -543,7 +545,7 @@ impl Web3ProxyApp { }; let allowed_referers: Option> = - if let Some(allowed_referers) = user_key_model.allowed_referers { + if let Some(allowed_referers) = rpc_key_model.allowed_referers { let x = allowed_referers .split(',') .map(|x| x.parse::()) @@ -555,7 +557,7 @@ impl Web3ProxyApp { }; let allowed_user_agents: Option> = - if let Some(allowed_user_agents) = user_key_model.allowed_user_agents { + if let Some(allowed_user_agents) = rpc_key_model.allowed_user_agents { let x: Result, _> = allowed_user_agents .split(',') .map(|x| x.parse::()) @@ -567,15 +569,15 @@ impl Web3ProxyApp { }; Ok(UserKeyData { - user_id: user_key_model.user_id, - user_key_id: user_key_model.id, - max_requests_per_period: user_key_model.requests_per_minute, - max_concurrent_requests: user_key_model.max_concurrent_requests, + user_id: rpc_key_model.user_id, + rpc_key_id: rpc_key_model.id, + max_requests_per_period: rpc_key_model.requests_per_minute, + max_concurrent_requests: rpc_key_model.max_concurrent_requests, allowed_ips, allowed_origins, allowed_referers, allowed_user_agents, - log_revert_chance: user_key_model.log_revert_chance, + log_revert_chance: rpc_key_model.log_revert_chance, }) } None => Ok(UserKeyData::default()), @@ -587,14 +589,14 @@ impl Web3ProxyApp { user_data.map_err(|err| anyhow::anyhow!(err)) } - pub async fn rate_limit_by_key(&self, user_key: RpcApiKey) -> anyhow::Result { - let user_data = self.user_data(user_key).await?; + pub async fn rate_limit_by_key(&self, rpc_key: RpcApiKey) -> anyhow::Result { + let user_data = self.user_data(rpc_key).await?; - if user_data.user_key_id == 0 { + if user_data.rpc_key_id == 0 { return Ok(RateLimitResult::UnknownKey); } - let semaphore = self.user_key_semaphore(&user_data).await?; + let semaphore = self.user_rpc_key_semaphore(&user_data).await?; let user_max_requests_per_period = match user_data.max_requests_per_period { None => { @@ -606,7 +608,7 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(rate_limiter) = &self.frontend_key_rate_limiter { match rate_limiter - .throttle(user_key.into(), Some(user_max_requests_per_period), 1) + .throttle(rpc_key.into(), Some(user_max_requests_per_period), 1) .await { Ok(DeferredRateLimitResult::Allowed) => { @@ -618,12 +620,12 @@ impl Web3ProxyApp { // this is too verbose, but a stat might be good // TODO: keys are secrets! use the id instead // TODO: emit a stat - trace!(?user_key, "rate limit exceeded until {:?}", retry_at); + trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimitedUser(user_data, Some(retry_at))) } Ok(DeferredRateLimitResult::RetryNever) => { // TODO: keys are secret. don't log them! - trace!(?user_key, "rate limit is 0"); + trace!(?rpc_key, "rate limit is 0"); // TODO: emit a stat Ok(RateLimitResult::RateLimitedUser(user_data, None)) } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index d0a860f1..908897b3 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -55,11 +55,11 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .route("/", post(rpc_proxy_http::proxy_web3_rpc)) .route("/", get(rpc_proxy_ws::websocket_handler)) .route( - "/rpc/:user_key", + "/rpc/:rpc_key", post(rpc_proxy_http::proxy_web3_rpc_with_key), ) .route( - "/rpc/:user_key", + "/rpc/:rpc_key", get(rpc_proxy_ws::websocket_handler_with_key), ) .route("/health", get(status::health)) @@ -73,8 +73,8 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .route("/user", post(users::user_post)) .route("/user/balance", get(users::user_balance_get)) .route("/user/balance/:txid", post(users::user_balance_post)) - .route("/user/keys", get(users::user_keys_get)) - .route("/user/keys", post(users::user_keys_post)) + .route("/user/keys", get(users::rpc_keys_get)) + .route("/user/keys", post(users::rpc_keys_post)) .route("/user/revert_logs", get(users::user_revert_logs_get)) .route( "/user/stats/aggregate", diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 61604f94..f4834d3f 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -32,7 +32,7 @@ pub async fn proxy_web3_rpc( let authorized_request = Arc::new(authorized_request); - // TODO: spawn earlier? + // TODO: spawn earlier? i think we want ip_is_authorized in this future let f = tokio::spawn(async move { app.proxy_web3_rpc(authorized_request, payload) .instrument(request_span) @@ -56,15 +56,15 @@ pub async fn proxy_web3_rpc_with_key( origin: Option>, referer: Option>, user_agent: Option>, - Path(user_key): Path, + Path(rpc_key): Path, ) -> FrontendResult { - let user_key = user_key.parse()?; + let rpc_key = rpc_key.parse()?; let request_span = error_span!("request", %ip, ?referer, ?user_agent); let (authorized_request, _semaphore) = key_is_authorized( &app, - user_key, + rpc_key, ip, origin.map(|x| x.0), referer.map(|x| x.0), diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 1b879aa4..915bc6db 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -78,19 +78,19 @@ pub async fn websocket_handler( pub async fn websocket_handler_with_key( Extension(app): Extension>, ClientIp(ip): ClientIp, - Path(user_key): Path, + Path(rpc_key): Path, origin: Option>, referer: Option>, user_agent: Option>, ws_upgrade: Option, ) -> FrontendResult { - let user_key = user_key.parse()?; + let rpc_key = rpc_key.parse()?; let request_span = error_span!("request", %ip, ?referer, ?user_agent); let (authorized_request, _semaphore) = key_is_authorized( &app, - user_key, + rpc_key, ip, origin.map(|x| x.0), referer.map(|x| x.0), diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 6e594867..0ba548d9 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -5,10 +5,10 @@ use crate::app::Web3ProxyApp; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; +use axum_macros::debug_handler; use moka::future::ConcurrentCacheExt; use serde_json::json; use std::sync::Arc; -use axum_macros::debug_handler; /// Health check page for load balancers to use. #[debug_handler] @@ -35,14 +35,14 @@ pub async fn prometheus(Extension(app): Extension>) -> impl In #[debug_handler] pub async fn status(Extension(app): Extension>) -> impl IntoResponse { app.pending_transactions.sync(); - app.user_key_cache.sync(); + app.rpc_key_cache.sync(); // TODO: what else should we include? uptime, cache hit rates, cpu load let body = json!({ "pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_size": app.pending_transactions.weighted_size(), - "user_cache_count": app.user_key_cache.entry_count(), - "user_cache_size": app.user_key_cache.weighted_size(), + "user_cache_count": app.rpc_key_cache.entry_count(), + "user_cache_size": app.rpc_key_cache.weighted_size(), "balanced_rpcs": app.balanced_rpcs, "private_rpcs": app.private_rpcs, }); diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index eb603b74..f58a9cd3 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -17,7 +17,7 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::{revert_logs, user, user_keys}; +use entities::{revert_logs, rpc_keys, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; @@ -260,9 +260,9 @@ pub async fn user_login_post( let rpc_key = RpcApiKey::new(); // TODO: variable requests per minute depending on the invite code - let uk = user_keys::ActiveModel { + let uk = rpc_keys::ActiveModel { user_id: sea_orm::Set(u.id), - api_key: sea_orm::Set(rpc_key.into()), + rpc_key: sea_orm::Set(rpc_key.into()), description: sea_orm::Set(Some("first".to_string())), requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute), ..Default::default() @@ -282,8 +282,8 @@ pub async fn user_login_post( } Some(u) => { // the user is already registered - let uks = user_keys::Entity::find() - .filter(user_keys::Column::UserId.eq(u.id)) + let uks = rpc_keys::Entity::find() + .filter(rpc_keys::Column::UserId.eq(u.id)) .all(&db_conn) .await .context("failed loading user's key")?; @@ -298,7 +298,7 @@ pub async fn user_login_post( // json response with everything in it // we could return just the bearer token, but I think they will always request api keys and the user profile let response_json = json!({ - "api_keys": uks + "rpc_keys": uks .into_iter() .map(|uk| (uk.id, uk)) .collect::>(), @@ -411,7 +411,7 @@ pub async fn user_post( /// - show balance in USD /// - show deposits history (currency, amounts, transaction id) /// -/// TODO: one key per request? maybe /user/balance/:api_key? +/// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] pub async fn user_balance_get( @@ -428,7 +428,7 @@ pub async fn user_balance_get( /// We will subscribe to events to watch for any user deposits, but sometimes events can be missed. /// /// TODO: rate limit by user -/// TODO: one key per request? maybe /user/balance/:api_key? +/// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] pub async fn user_balance_post( @@ -442,9 +442,9 @@ pub async fn user_balance_post( /// `GET /user/keys` -- Use a bearer token to get the user's api keys and their settings. /// -/// TODO: one key per request? maybe /user/keys/:api_key? +/// TODO: one key per request? maybe /user/keys/:rpc_key? #[debug_handler] -pub async fn user_keys_get( +pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { @@ -452,8 +452,8 @@ pub async fn user_keys_get( let db_conn = app.db_conn().context("getting db to fetch user's keys")?; - let uks = user_keys::Entity::find() - .filter(user_keys::Column::UserId.eq(user.id)) + let uks = rpc_keys::Entity::find() + .filter(rpc_keys::Column::UserId.eq(user.id)) .all(&db_conn) .await .context("failed loading user's key")?; @@ -470,7 +470,7 @@ pub async fn user_keys_get( Ok(Json(response_json).into_response()) } -/// the JSON input to the `user_keys_post` handler. +/// the JSON input to the `rpc_keys_post` handler. #[derive(Deserialize)] pub struct UserKeysPost { // TODO: make sure the email address is valid. probably have a "verified" column in the database @@ -491,9 +491,9 @@ pub struct UserKeysPost { /// `POST /user/keys` -- Use a bearer token to create a new key or modify an existing key. /// /// TODO: read json from the request body -/// TODO: one key per request? maybe /user/keys/:api_key? +/// TODO: one key per request? maybe /user/keys/:rpc_key? #[debug_handler] -pub async fn user_keys_post( +pub async fn rpc_keys_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Json(payload): Json, @@ -504,9 +504,9 @@ pub async fn user_keys_post( let mut uk = if let Some(existing_key_id) = payload.existing_key_id { // get the key and make sure it belongs to the user - let uk = user_keys::Entity::find() - .filter(user_keys::Column::UserId.eq(user.id)) - .filter(user_keys::Column::Id.eq(existing_key_id)) + let uk = rpc_keys::Entity::find() + .filter(rpc_keys::Column::UserId.eq(user.id)) + .filter(rpc_keys::Column::Id.eq(existing_key_id)) .one(&db_conn) .await .context("failed loading user's key")? @@ -515,9 +515,9 @@ pub async fn user_keys_post( uk.try_into().unwrap() } else if let Some(existing_key) = payload.existing_key { // get the key and make sure it belongs to the user - let uk = user_keys::Entity::find() - .filter(user_keys::Column::UserId.eq(user.id)) - .filter(user_keys::Column::ApiKey.eq(Uuid::from(existing_key))) + let uk = rpc_keys::Entity::find() + .filter(rpc_keys::Column::UserId.eq(user.id)) + .filter(rpc_keys::Column::RpcKey.eq(Uuid::from(existing_key))) .one(&db_conn) .await .context("failed loading user's key")? @@ -529,9 +529,9 @@ pub async fn user_keys_post( // TODO: limit to 10 keys? let rpc_key = RpcApiKey::new(); - user_keys::ActiveModel { + rpc_keys::ActiveModel { user_id: sea_orm::Set(user.id), - api_key: sea_orm::Set(rpc_key.into()), + rpc_key: sea_orm::Set(rpc_key.into()), requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute), ..Default::default() } @@ -661,7 +661,7 @@ pub async fn user_keys_post( uk }; - let uk: user_keys::Model = uk.try_into()?; + let uk: rpc_keys::Model = uk.try_into()?; Ok(Json(uk).into_response()) } @@ -691,8 +691,8 @@ pub async fn user_revert_logs_get( let db_conn = app.db_conn().context("getting db for user's revert logs")?; - let uks = user_keys::Entity::find() - .filter(user_keys::Column::UserId.eq(user.id)) + let uks = rpc_keys::Entity::find() + .filter(rpc_keys::Column::UserId.eq(user.id)) .all(&db_conn) .await .context("failed loading user's key")?; @@ -703,7 +703,7 @@ pub async fn user_revert_logs_get( // get paginated logs let q = revert_logs::Entity::find() .filter(revert_logs::Column::Timestamp.gte(query_start)) - .filter(revert_logs::Column::UserKeyId.is_in(uks)) + .filter(revert_logs::Column::RpcKeyId.is_in(uks)) .order_by_asc(revert_logs::Column::Timestamp); let q = if chain_id == 0 { diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 85b19415..09ab7bd5 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -65,7 +65,7 @@ impl Web3Connections { return Ok(()); } - info!(%block_hash, %block_num, "saving new block"); + trace!(%block_hash, %block_num, "saving new block"); self.block_hashes .insert(*block_hash, block.to_owned()) diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 9a922ecc..2218e9c4 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -95,7 +95,7 @@ impl AuthorizedRequest { let call_data = params.data.map(|x| format!("{}", x)); let rl = revert_logs::ActiveModel { - user_key_id: sea_orm::Set(authorized_request.user_key_id), + rpc_key_id: sea_orm::Set(authorized_request.rpc_key_id), method: sea_orm::Set(method), to: sea_orm::Set(to), call_data: sea_orm::Set(call_data), diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index cd1d2f4f..27495068 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -4,7 +4,7 @@ use axum::{ TypedHeader, }; use chrono::NaiveDateTime; -use entities::{rpc_accounting, user_keys}; +use entities::{rpc_accounting, rpc_keys}; use hashbrown::HashMap; use migration::Expr; use num::Zero; @@ -36,7 +36,7 @@ async fn get_user_id_from_params( .get::<_, u64>(bearer_cache_key) .await // TODO: this should be a 403 - .context("fetching user_key_id from redis with bearer_cache_key") + .context("fetching rpc_key_id from redis with bearer_cache_key") } (_, None) => { // they have a bearer token. we don't care about it on public pages @@ -53,15 +53,15 @@ async fn get_user_id_from_params( } } -/// only allow user_key to be set if user_id is also set. +/// only allow rpc_key to be set if user_id is also set. /// this will keep people from reading someone else's keys. /// 0 means none. -pub fn get_user_key_id_from_params( +pub fn get_rpc_key_id_from_params( user_id: u64, params: &HashMap, ) -> anyhow::Result { if user_id > 0 { - params.get("user_key_id").map_or_else( + params.get("rpc_key_id").map_or_else( || Ok(0), |c| { let c = c.parse()?; @@ -266,21 +266,18 @@ pub async fn get_aggregate_rpc_stats_from_params( // TODO: are these joins correct? // TODO: what about keys where they are the secondary users? let q = q - .join( - JoinType::InnerJoin, - rpc_accounting::Relation::UserKeys.def(), - ) - .column(user_keys::Column::UserId) - .group_by(user_keys::Column::UserId); + .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKeys.def()) + .column(rpc_keys::Column::UserId) + .group_by(rpc_keys::Column::UserId); - let condition = condition.add(user_keys::Column::UserId.eq(user_id)); + let condition = condition.add(rpc_keys::Column::UserId.eq(user_id)); (condition, q) }; let q = q.filter(condition); - // TODO: enum between searching on user_key_id on user_id + // TODO: enum between searching on rpc_key_id on user_id // TODO: handle secondary users, too // log query here. i think sea orm has a useful log level for this @@ -306,7 +303,7 @@ pub async fn get_detailed_stats( let redis_conn = app.redis_conn().await.context("connecting to redis")?; let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; - let user_key_id = get_user_key_id_from_params(user_id, ¶ms)?; + let rpc_key_id = get_rpc_key_id_from_params(user_id, ¶ms)?; let chain_id = get_chain_id_from_params(app, ¶ms)?; let query_start = get_query_start_from_params(¶ms)?; let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; @@ -390,24 +387,21 @@ pub async fn get_detailed_stats( // TODO: move authentication here? // TODO: what about keys where this user is a secondary user? let q = q - .join( - JoinType::InnerJoin, - rpc_accounting::Relation::UserKeys.def(), - ) - .column(user_keys::Column::UserId) - .group_by(user_keys::Column::UserId); + .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKeys.def()) + .column(rpc_keys::Column::UserId) + .group_by(rpc_keys::Column::UserId); - let condition = condition.add(user_keys::Column::UserId.eq(user_id)); + let condition = condition.add(rpc_keys::Column::UserId.eq(user_id)); - let q = if user_key_id == 0 { - q.column(user_keys::Column::UserId) - .group_by(user_keys::Column::UserId) + let q = if rpc_key_id == 0 { + q.column(rpc_keys::Column::UserId) + .group_by(rpc_keys::Column::UserId) } else { - response.insert("user_key_id", serde_json::to_value(user_key_id)?); + response.insert("rpc_key_id", serde_json::to_value(rpc_key_id)?); // no need to group_by user_id when we are grouping by key_id - q.column(user_keys::Column::Id) - .group_by(user_keys::Column::Id) + q.column(rpc_keys::Column::Id) + .group_by(rpc_keys::Column::Id) }; (condition, q)