rename api_key to rpc_key

This commit is contained in:
Bryan Stitt 2022-10-27 00:12:42 +00:00
parent 0c3194f445
commit 366f2c8f84
27 changed files with 220 additions and 171 deletions

6
Cargo.lock generated

@ -1348,7 +1348,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.5.0"
version = "0.6.0"
dependencies = [
"sea-orm",
"serde",
@ -2657,7 +2657,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.5.0"
version = "0.6.0"
dependencies = [
"sea-orm-migration",
"tokio",
@ -5526,7 +5526,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.5.0"
version = "0.6.0"
dependencies = [
"anyhow",
"arc-swap",

11
TODO.md

@ -195,8 +195,10 @@ These are roughly in order of completition
- [x] send logs to sentry
- [x] login should return the user id
- [x] when we show keys, also show the key's id
- [x] add config for concurrent requests from public requests
- [x] new endpoints for users (not totally sure about the exact paths, but these features are all needed):
- [x] sign in
- [x] login should include the key id, not just the key ULID
- [x] sign out
- [x] GET profile endpoint
- [x] POST profile endpoint
@ -208,13 +210,13 @@ These are roughly in order of completition
- [x] generate a new key from a web endpoint
- [x] modifying key settings such as private relay, revert logging, ip/origin/etc checks
- [x] GET logged reverts on an endpoint that **requires authentication**.
- [ ] add config for concurrent requests from public requests
- [ ] rename user_key to rpc_key
- [x] in code
- [ ] in database with a migration
- [ ] document url params with examples
- [ ] display concurrent requests per api key (only with authentication!)
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] instead of requests_per_minute on every key, have a "user_tier" that gets joined
- [ ] include if archive query or not in the stats
- this is already partially done, but we need to double check it works. preferrably with tests
- [ ] login should include the key id, not just the key ULID
- [-] add configurable size limits to all the Caches
- [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
@ -237,6 +239,7 @@ These are roughly in order of completition
- might have to use Uuid in sea-orm and then convert to Ulid on display
- [ ] add pruning or aggregating or something to log revert trace. otherwise our databases are going to grow really big
- [ ] after adding this, allow posting to /user/keys to turn on revert logging
- [ ] display concurrent requests per api key (only with authentication!)
## V1

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
[lib]

@ -7,4 +7,4 @@ pub mod rpc_accounting;
pub mod sea_orm_active_enums;
pub mod secondary_user;
pub mod user;
pub mod user_keys;
pub mod rpc_keys;

@ -2,6 +2,6 @@
pub use super::revert_logs::Entity as RevertLogs;
pub use super::rpc_accounting::Entity as RpcAccounting;
pub use super::rpc_keys::Entity as UserKeys;
pub use super::secondary_user::Entity as SecondaryUser;
pub use super::user::Entity as User;
pub use super::user_keys::Entity as UserKeys;

@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub user_key_id: u64,
pub rpc_key_id: u64,
pub timestamp: DateTimeUtc,
pub method: Method,
pub to: Vec<u8>,
@ -21,16 +21,16 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user_keys::Entity",
from = "Column::UserKeyId",
to = "super::user_keys::Column::Id",
belongs_to = "super::rpc_keys::Entity",
from = "Column::RpcKeyId",
to = "super::rpc_keys::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
UserKeys,
}
impl Related<super::user_keys::Entity> for Entity {
impl Related<super::rpc_keys::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserKeys.def()
}

@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub user_key_id: u64,
pub rpc_key_id: u64,
pub chain_id: u64,
pub method: String,
pub error_response: bool,
@ -47,18 +47,18 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user_keys::Entity",
from = "Column::UserKeyId",
to = "super::user_keys::Column::Id",
belongs_to = "super::rpc_keys::Entity",
from = "Column::RpcKeyId",
to = "super::rpc_keys::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
UserKeys,
RpcKeys,
}
impl Related<super::user_keys::Entity> for Entity {
impl Related<super::rpc_keys::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserKeys.def()
Relation::RpcKeys.def()
}
}

@ -4,13 +4,13 @@ use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "user_keys")]
#[sea_orm(table_name = "rpc_keys")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub user_id: u64,
#[sea_orm(unique)]
pub api_key: Uuid,
pub rpc_key: Uuid,
pub description: Option<String>,
pub private_txs: bool,
pub active: bool,

@ -18,8 +18,8 @@ pub struct Model {
pub enum Relation {
#[sea_orm(has_many = "super::secondary_user::Entity")]
SecondaryUser,
#[sea_orm(has_many = "super::user_keys::Entity")]
UserKeys,
#[sea_orm(has_many = "super::rpc_keys::Entity")]
RpcKeys,
}
impl Related<super::secondary_user::Entity> for Entity {
@ -28,9 +28,9 @@ impl Related<super::secondary_user::Entity> for Entity {
}
}
impl Related<super::user_keys::Entity> for Entity {
impl Related<super::rpc_keys::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserKeys.def()
Relation::RpcKeys.def()
}
}

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
publish = false

@ -5,6 +5,7 @@ mod m20220921_181610_log_reverts;
mod m20220928_015108_concurrency_limits;
mod m20221007_213828_accounting;
mod m20221025_210326_add_chain_id_to_reverts;
mod m20221026_230819_rename_user_keys;
pub struct Migrator;
@ -17,6 +18,7 @@ impl MigratorTrait for Migrator {
Box::new(m20220928_015108_concurrency_limits::Migration),
Box::new(m20221007_213828_accounting::Migration),
Box::new(m20221025_210326_add_chain_id_to_reverts::Migration),
Box::new(m20221026_230819_rename_user_keys::Migration),
]
}
}

@ -174,8 +174,8 @@ enum SecondaryUser {
/*
-- TODO: foreign keys
-- TODO: index on api_key
-- TODO: what size for api_key
-- TODO: index on rpc_key
-- TODO: what size for rpc_key
-- TODO: track active with a timestamp?
-- TODO: creation time?
-- TODO: requests_per_day? requests_per_second?,

@ -0,0 +1,49 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.rename_table(
Table::rename()
.table(Alias::new("user_keys"), Alias::new("rpc_keys"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Alias::new("rpc_keys"))
.rename_column(Alias::new("api_key"), Alias::new("rpc_key"))
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Alias::new("rpc_keys"))
.rename_column(Alias::new("rpc_key"), Alias::new("api_key"))
.to_owned(),
)
.await?;
manager
.rename_table(
Table::rename()
.table(Alias::new("rpc_keys"), Alias::new("user_keys"))
.to_owned(),
)
.await?;
Ok(())
}
}

@ -64,7 +64,7 @@ impl RedisRateLimiter {
Instant::now().add(Duration::from_secs_f32(seconds_left_in_period))
}
/// label might be an ip address or a user_key id.
/// label might be an ip address or a rpc_key id.
/// if setting max_per_period, be sure to keep the period the same for all requests to this label
pub async fn throttle_label(
&self,

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
default-run = "web3_proxy"

@ -63,12 +63,11 @@ type ResponseCache =
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
#[derive(Clone, Debug, Default, From)]
/// TODO: rename this?
pub struct UserKeyData {
/// database id of the primary user
pub user_id: u64,
/// database id of the api key
pub user_key_id: u64,
/// database id of the rpc key
pub rpc_key_id: u64,
/// if None, allow unlimited queries
pub max_requests_per_period: Option<u64>,
// if None, allow unlimited concurrent requests
@ -109,8 +108,8 @@ pub struct Web3ProxyApp {
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub vredis_pool: Option<RedisPool>,
pub user_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores:
Cache<String, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
@ -449,7 +448,7 @@ impl Web3ProxyApp {
// if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config
// TODO: ttl from config
let user_key_cache = Cache::builder()
let rpc_key_cache = Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(60))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
@ -462,7 +461,7 @@ impl Web3ProxyApp {
let ip_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let user_key_semaphores = Cache::builder()
let rpc_key_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
@ -481,10 +480,10 @@ impl Web3ProxyApp {
vredis_pool,
app_metrics,
open_request_handle_metrics,
user_key_cache,
rpc_key_cache,
bearer_token_semaphores,
ip_semaphores,
user_key_semaphores,
rpc_key_semaphores,
stat_sender,
};

@ -18,7 +18,7 @@ use tracing::{error, info, trace};
/// TODO: can we use something inside sea_orm instead?
#[derive(Debug)]
pub struct ProxyResponseStat {
user_key_id: u64,
rpc_key_id: u64,
method: String,
period_seconds: u64,
period_timestamp: u64,
@ -56,7 +56,7 @@ impl Default for ProxyResponseHistograms {
// TODO: impl From for our database model
pub struct ProxyResponseAggregate {
// these are the key
// user_key_id: u64,
// rpc_key_id: u64,
// method: String,
// error_response: bool,
// TODO: this is the grandparent key. get it from there somehow
@ -75,7 +75,7 @@ pub struct ProxyResponseAggregate {
#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct UserProxyResponseKey {
user_key_id: u64,
rpc_key_id: u64,
method: String,
error_response: bool,
}
@ -128,7 +128,7 @@ impl ProxyResponseStat {
let response_millis = metadata.start_instant.elapsed().as_millis() as u64;
Self {
user_key_id: authorized_key.user_key_id,
rpc_key_id: authorized_key.rpc_key_id,
method,
backend_requests,
period_seconds,
@ -286,7 +286,7 @@ impl StatEmitter {
let stat = rpc_accounting::ActiveModel {
id: sea_orm::NotSet,
user_key_id: sea_orm::Set(k.user_key_id),
rpc_key_id: sea_orm::Set(k.rpc_key_id),
chain_id: sea_orm::Set(self.chain_id),
method: sea_orm::Set(k.method.clone()),
error_response: sea_orm::Set(k.error_response),
@ -356,7 +356,7 @@ impl StatEmitter {
})
.await;
let key = (stat.user_key_id, stat.method, stat.error_response).into();
let key = (stat.rpc_key_id, stat.method, stat.error_response).into();
let user_aggregate = user_cache
.get_with(key, async move {

@ -1,6 +1,6 @@
use anyhow::Context;
use argh::FromArgs;
use entities::{user, user_keys};
use entities::{rpc_keys, user};
use ethers::prelude::Address;
use sea_orm::{ActiveModelTrait, TransactionTrait};
use tracing::info;
@ -25,7 +25,7 @@ pub struct CreateUserSubCommand {
/// the user's first api ULID or UUID key.
/// If none given, one will be created.
#[argh(option)]
api_key: RpcApiKey,
rpc_key: RpcApiKey,
/// the key's maximum requests per minute.
/// Default to "None" which the code sees as "unlimited" requests.
@ -74,9 +74,9 @@ impl CreateUserSubCommand {
);
// create a key for the new user
let uk = user_keys::ActiveModel {
let uk = rpc_keys::ActiveModel {
user_id: u.id,
api_key: sea_orm::Set(self.api_key.into()),
rpc_key: sea_orm::Set(self.rpc_key.into()),
requests_per_minute: sea_orm::Set(self.rpm),
description: sea_orm::Set(self.description),
..Default::default()
@ -87,8 +87,8 @@ impl CreateUserSubCommand {
txn.commit().await?;
info!("user key as ULID: {}", Ulid::from(self.api_key));
info!("user key as UUID: {}", Uuid::from(self.api_key));
info!("user key as ULID: {}", Ulid::from(self.rpc_key));
info!("user key as UUID: {}", Uuid::from(self.rpc_key));
Ok(())
}

@ -9,7 +9,7 @@ use axum::headers::{Header, Origin, Referer, UserAgent};
use axum::TypedHeader;
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::{user, user_keys};
use entities::{rpc_keys, user};
use http::HeaderValue;
use ipnet::IpNet;
use redis_rate_limiter::redis::AsyncCommands;
@ -37,11 +37,11 @@ pub enum RateLimitResult {
/// contains the IP of the anonymous user
/// TODO: option inside or outside the arc?
AllowedIp(IpAddr, Option<OwnedSemaphorePermit>),
/// contains the user_key_id of an authenticated user
/// contains the rpc_key_id of an authenticated user
AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>),
/// contains the IP and retry_at of the anonymous user
RateLimitedIp(IpAddr, Option<Instant>),
/// contains the user_key_id and retry_at of an authenticated user key
/// contains the rpc_key_id and retry_at of an authenticated user key
RateLimitedUser(UserKeyData, Option<Instant>),
/// This key is not in our database. Deny access!
UnknownKey,
@ -52,7 +52,7 @@ pub struct AuthorizedKey {
pub ip: IpAddr,
pub origin: Option<Origin>,
pub user_id: u64,
pub user_key_id: u64,
pub rpc_key_id: u64,
// TODO: just use an f32? even an f16 is probably fine
pub log_revert_chance: Decimal,
}
@ -178,10 +178,10 @@ impl AuthorizedKey {
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
user_key_data: UserKeyData,
rpc_key_data: UserKeyData,
) -> anyhow::Result<Self> {
// check ip
match &user_key_data.allowed_ips {
match &rpc_key_data.allowed_ips {
None => {}
Some(allowed_ips) => {
if !allowed_ips.iter().any(|x| x.contains(&ip)) {
@ -191,7 +191,7 @@ impl AuthorizedKey {
}
// check origin
match (&origin, &user_key_data.allowed_origins) {
match (&origin, &rpc_key_data.allowed_origins) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("Origin required")),
@ -203,7 +203,7 @@ impl AuthorizedKey {
}
// check referer
match (referer, &user_key_data.allowed_referers) {
match (referer, &rpc_key_data.allowed_referers) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("Referer required")),
@ -215,7 +215,7 @@ impl AuthorizedKey {
}
// check user_agent
match (user_agent, &user_key_data.allowed_user_agents) {
match (user_agent, &rpc_key_data.allowed_user_agents) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("User agent required")),
@ -229,9 +229,9 @@ impl AuthorizedKey {
Ok(Self {
ip,
origin,
user_id: user_key_data.user_id,
user_key_id: user_key_data.user_key_id,
log_revert_chance: user_key_data.log_revert_chance,
user_id: rpc_key_data.user_id,
rpc_key_id: rpc_key_data.rpc_key_id,
log_revert_chance: rpc_key_data.log_revert_chance,
})
}
}
@ -251,7 +251,7 @@ impl Display for &AuthorizedRequest {
match self {
AuthorizedRequest::Internal => f.write_str("int"),
AuthorizedRequest::Ip(x, _) => f.write_str(&format!("ip-{}", x)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.user_key_id)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.rpc_key_id)),
}
}
}
@ -296,14 +296,14 @@ pub async fn ip_is_authorized(
pub async fn key_is_authorized(
app: &Web3ProxyApp,
user_key: RpcApiKey,
rpc_key: RpcApiKey,
ip: IpAddr,
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<(AuthorizedRequest, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// check the rate limits. error if over the limit
let (user_data, semaphore) = match app.rate_limit_by_key(user_key).await? {
let (user_data, semaphore) = match app.rate_limit_by_key(rpc_key).await? {
RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore),
RateLimitResult::RateLimitedUser(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedUser(x, retry_at));
@ -321,6 +321,7 @@ pub async fn key_is_authorized(
}
impl Web3ProxyApp {
/// Limit the number of concurrent requests from the given ip address.
pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
@ -345,16 +346,17 @@ impl Web3ProxyApp {
}
}
pub async fn user_key_semaphore(
/// Limit the number of concurrent requests from the given key address.
pub async fn user_rpc_key_semaphore(
&self,
user_data: &UserKeyData,
rpc_key_data: &UserKeyData,
) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = user_data.max_concurrent_requests {
if let Some(max_concurrent_requests) = rpc_key_data.max_concurrent_requests {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
.rpc_key_semaphores
.get_with(rpc_key_data.rpc_key_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
trace!("new semaphore for user_key_id {}", user_data.user_key_id);
trace!("new semaphore for rpc_key_id {}", rpc_key_data.rpc_key_id);
Arc::new(s)
})
.await;
@ -493,30 +495,30 @@ impl Web3ProxyApp {
}
// check the local cache for user data, or query the database
pub(crate) async fn user_data(&self, user_key: RpcApiKey) -> anyhow::Result<UserKeyData> {
pub(crate) async fn user_data(&self, rpc_key: RpcApiKey) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self
.user_key_cache
.try_get_with(user_key.into(), async move {
trace!(?user_key, "user_cache miss");
.rpc_key_cache
.try_get_with(rpc_key.into(), async move {
trace!(?rpc_key, "user_cache miss");
let db_conn = self.db_conn().context("Getting database connection")?;
let user_uuid: Uuid = user_key.into();
let rpc_key: Uuid = rpc_key.into();
// TODO: join the user table to this to return the User? we don't always need it
// TODO: also attach secondary users
match user_keys::Entity::find()
.filter(user_keys::Column::ApiKey.eq(user_uuid))
.filter(user_keys::Column::Active.eq(true))
match rpc_keys::Entity::find()
.filter(rpc_keys::Column::RpcKey.eq(rpc_key))
.filter(rpc_keys::Column::Active.eq(true))
.one(&db_conn)
.await?
{
Some(user_key_model) => {
Some(rpc_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
let allowed_ips: Option<Vec<IpNet>> =
if let Some(allowed_ips) = user_key_model.allowed_ips {
if let Some(allowed_ips) = rpc_key_model.allowed_ips {
let x = allowed_ips
.split(',')
.map(|x| x.parse::<IpNet>())
@ -527,7 +529,7 @@ impl Web3ProxyApp {
};
let allowed_origins: Option<Vec<Origin>> =
if let Some(allowed_origins) = user_key_model.allowed_origins {
if let Some(allowed_origins) = rpc_key_model.allowed_origins {
// TODO: do this without collecting twice?
let x = allowed_origins
.split(',')
@ -543,7 +545,7 @@ impl Web3ProxyApp {
};
let allowed_referers: Option<Vec<Referer>> =
if let Some(allowed_referers) = user_key_model.allowed_referers {
if let Some(allowed_referers) = rpc_key_model.allowed_referers {
let x = allowed_referers
.split(',')
.map(|x| x.parse::<Referer>())
@ -555,7 +557,7 @@ impl Web3ProxyApp {
};
let allowed_user_agents: Option<Vec<UserAgent>> =
if let Some(allowed_user_agents) = user_key_model.allowed_user_agents {
if let Some(allowed_user_agents) = rpc_key_model.allowed_user_agents {
let x: Result<Vec<_>, _> = allowed_user_agents
.split(',')
.map(|x| x.parse::<UserAgent>())
@ -567,15 +569,15 @@ impl Web3ProxyApp {
};
Ok(UserKeyData {
user_id: user_key_model.user_id,
user_key_id: user_key_model.id,
max_requests_per_period: user_key_model.requests_per_minute,
max_concurrent_requests: user_key_model.max_concurrent_requests,
user_id: rpc_key_model.user_id,
rpc_key_id: rpc_key_model.id,
max_requests_per_period: rpc_key_model.requests_per_minute,
max_concurrent_requests: rpc_key_model.max_concurrent_requests,
allowed_ips,
allowed_origins,
allowed_referers,
allowed_user_agents,
log_revert_chance: user_key_model.log_revert_chance,
log_revert_chance: rpc_key_model.log_revert_chance,
})
}
None => Ok(UserKeyData::default()),
@ -587,14 +589,14 @@ impl Web3ProxyApp {
user_data.map_err(|err| anyhow::anyhow!(err))
}
pub async fn rate_limit_by_key(&self, user_key: RpcApiKey) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(user_key).await?;
pub async fn rate_limit_by_key(&self, rpc_key: RpcApiKey) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(rpc_key).await?;
if user_data.user_key_id == 0 {
if user_data.rpc_key_id == 0 {
return Ok(RateLimitResult::UnknownKey);
}
let semaphore = self.user_key_semaphore(&user_data).await?;
let semaphore = self.user_rpc_key_semaphore(&user_data).await?;
let user_max_requests_per_period = match user_data.max_requests_per_period {
None => {
@ -606,7 +608,7 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits
if let Some(rate_limiter) = &self.frontend_key_rate_limiter {
match rate_limiter
.throttle(user_key.into(), Some(user_max_requests_per_period), 1)
.throttle(rpc_key.into(), Some(user_max_requests_per_period), 1)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
@ -618,12 +620,12 @@ impl Web3ProxyApp {
// this is too verbose, but a stat might be good
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
trace!(?user_key, "rate limit exceeded until {:?}", retry_at);
trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimitedUser(user_data, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
trace!(?user_key, "rate limit is 0");
trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
Ok(RateLimitResult::RateLimitedUser(user_data, None))
}

@ -55,11 +55,11 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
.route("/", post(rpc_proxy_http::proxy_web3_rpc))
.route("/", get(rpc_proxy_ws::websocket_handler))
.route(
"/rpc/:user_key",
"/rpc/:rpc_key",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
)
.route(
"/rpc/:user_key",
"/rpc/:rpc_key",
get(rpc_proxy_ws::websocket_handler_with_key),
)
.route("/health", get(status::health))
@ -73,8 +73,8 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
.route("/user", post(users::user_post))
.route("/user/balance", get(users::user_balance_get))
.route("/user/balance/:txid", post(users::user_balance_post))
.route("/user/keys", get(users::user_keys_get))
.route("/user/keys", post(users::user_keys_post))
.route("/user/keys", get(users::rpc_keys_get))
.route("/user/keys", post(users::rpc_keys_post))
.route("/user/revert_logs", get(users::user_revert_logs_get))
.route(
"/user/stats/aggregate",

@ -32,7 +32,7 @@ pub async fn proxy_web3_rpc(
let authorized_request = Arc::new(authorized_request);
// TODO: spawn earlier?
// TODO: spawn earlier? i think we want ip_is_authorized in this future
let f = tokio::spawn(async move {
app.proxy_web3_rpc(authorized_request, payload)
.instrument(request_span)
@ -56,15 +56,15 @@ pub async fn proxy_web3_rpc_with_key(
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(user_key): Path<String>,
Path(rpc_key): Path<String>,
) -> FrontendResult {
let user_key = user_key.parse()?;
let rpc_key = rpc_key.parse()?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let (authorized_request, _semaphore) = key_is_authorized(
&app,
user_key,
rpc_key,
ip,
origin.map(|x| x.0),
referer.map(|x| x.0),

@ -78,19 +78,19 @@ pub async fn websocket_handler(
pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
Path(user_key): Path<String>,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
let user_key = user_key.parse()?;
let rpc_key = rpc_key.parse()?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let (authorized_request, _semaphore) = key_is_authorized(
&app,
user_key,
rpc_key,
ip,
origin.map(|x| x.0),
referer.map(|x| x.0),

@ -5,10 +5,10 @@
use crate::app::Web3ProxyApp;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use axum_macros::debug_handler;
use moka::future::ConcurrentCacheExt;
use serde_json::json;
use std::sync::Arc;
use axum_macros::debug_handler;
/// Health check page for load balancers to use.
#[debug_handler]
@ -35,14 +35,14 @@ pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl In
#[debug_handler]
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.pending_transactions.sync();
app.user_key_cache.sync();
app.rpc_key_cache.sync();
// TODO: what else should we include? uptime, cache hit rates, cpu load
let body = json!({
"pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(),
"user_cache_count": app.user_key_cache.entry_count(),
"user_cache_size": app.user_key_cache.weighted_size(),
"user_cache_count": app.rpc_key_cache.entry_count(),
"user_cache_size": app.rpc_key_cache.weighted_size(),
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
});

@ -17,7 +17,7 @@ use axum::{
};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use entities::{revert_logs, user, user_keys};
use entities::{revert_logs, rpc_keys, user};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::{HeaderValue, StatusCode};
@ -260,9 +260,9 @@ pub async fn user_login_post(
let rpc_key = RpcApiKey::new();
// TODO: variable requests per minute depending on the invite code
let uk = user_keys::ActiveModel {
let uk = rpc_keys::ActiveModel {
user_id: sea_orm::Set(u.id),
api_key: sea_orm::Set(rpc_key.into()),
rpc_key: sea_orm::Set(rpc_key.into()),
description: sea_orm::Set(Some("first".to_string())),
requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute),
..Default::default()
@ -282,8 +282,8 @@ pub async fn user_login_post(
}
Some(u) => {
// the user is already registered
let uks = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(u.id))
let uks = rpc_keys::Entity::find()
.filter(rpc_keys::Column::UserId.eq(u.id))
.all(&db_conn)
.await
.context("failed loading user's key")?;
@ -298,7 +298,7 @@ pub async fn user_login_post(
// json response with everything in it
// we could return just the bearer token, but I think they will always request api keys and the user profile
let response_json = json!({
"api_keys": uks
"rpc_keys": uks
.into_iter()
.map(|uk| (uk.id, uk))
.collect::<HashMap<_, _>>(),
@ -411,7 +411,7 @@ pub async fn user_post(
/// - show balance in USD
/// - show deposits history (currency, amounts, transaction id)
///
/// TODO: one key per request? maybe /user/balance/:api_key?
/// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_balance_get(
@ -428,7 +428,7 @@ pub async fn user_balance_get(
/// We will subscribe to events to watch for any user deposits, but sometimes events can be missed.
///
/// TODO: rate limit by user
/// TODO: one key per request? maybe /user/balance/:api_key?
/// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_balance_post(
@ -442,9 +442,9 @@ pub async fn user_balance_post(
/// `GET /user/keys` -- Use a bearer token to get the user's api keys and their settings.
///
/// TODO: one key per request? maybe /user/keys/:api_key?
/// TODO: one key per request? maybe /user/keys/:rpc_key?
#[debug_handler]
pub async fn user_keys_get(
pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
@ -452,8 +452,8 @@ pub async fn user_keys_get(
let db_conn = app.db_conn().context("getting db to fetch user's keys")?;
let uks = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
let uks = rpc_keys::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id))
.all(&db_conn)
.await
.context("failed loading user's key")?;
@ -470,7 +470,7 @@ pub async fn user_keys_get(
Ok(Json(response_json).into_response())
}
/// the JSON input to the `user_keys_post` handler.
/// the JSON input to the `rpc_keys_post` handler.
#[derive(Deserialize)]
pub struct UserKeysPost {
// TODO: make sure the email address is valid. probably have a "verified" column in the database
@ -491,9 +491,9 @@ pub struct UserKeysPost {
/// `POST /user/keys` -- Use a bearer token to create a new key or modify an existing key.
///
/// TODO: read json from the request body
/// TODO: one key per request? maybe /user/keys/:api_key?
/// TODO: one key per request? maybe /user/keys/:rpc_key?
#[debug_handler]
pub async fn user_keys_post(
pub async fn rpc_keys_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserKeysPost>,
@ -504,9 +504,9 @@ pub async fn user_keys_post(
let mut uk = if let Some(existing_key_id) = payload.existing_key_id {
// get the key and make sure it belongs to the user
let uk = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
.filter(user_keys::Column::Id.eq(existing_key_id))
let uk = rpc_keys::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id))
.filter(rpc_keys::Column::Id.eq(existing_key_id))
.one(&db_conn)
.await
.context("failed loading user's key")?
@ -515,9 +515,9 @@ pub async fn user_keys_post(
uk.try_into().unwrap()
} else if let Some(existing_key) = payload.existing_key {
// get the key and make sure it belongs to the user
let uk = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
.filter(user_keys::Column::ApiKey.eq(Uuid::from(existing_key)))
let uk = rpc_keys::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id))
.filter(rpc_keys::Column::RpcKey.eq(Uuid::from(existing_key)))
.one(&db_conn)
.await
.context("failed loading user's key")?
@ -529,9 +529,9 @@ pub async fn user_keys_post(
// TODO: limit to 10 keys?
let rpc_key = RpcApiKey::new();
user_keys::ActiveModel {
rpc_keys::ActiveModel {
user_id: sea_orm::Set(user.id),
api_key: sea_orm::Set(rpc_key.into()),
rpc_key: sea_orm::Set(rpc_key.into()),
requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute),
..Default::default()
}
@ -661,7 +661,7 @@ pub async fn user_keys_post(
uk
};
let uk: user_keys::Model = uk.try_into()?;
let uk: rpc_keys::Model = uk.try_into()?;
Ok(Json(uk).into_response())
}
@ -691,8 +691,8 @@ pub async fn user_revert_logs_get(
let db_conn = app.db_conn().context("getting db for user's revert logs")?;
let uks = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
let uks = rpc_keys::Entity::find()
.filter(rpc_keys::Column::UserId.eq(user.id))
.all(&db_conn)
.await
.context("failed loading user's key")?;
@ -703,7 +703,7 @@ pub async fn user_revert_logs_get(
// get paginated logs
let q = revert_logs::Entity::find()
.filter(revert_logs::Column::Timestamp.gte(query_start))
.filter(revert_logs::Column::UserKeyId.is_in(uks))
.filter(revert_logs::Column::RpcKeyId.is_in(uks))
.order_by_asc(revert_logs::Column::Timestamp);
let q = if chain_id == 0 {

@ -65,7 +65,7 @@ impl Web3Connections {
return Ok(());
}
info!(%block_hash, %block_num, "saving new block");
trace!(%block_hash, %block_num, "saving new block");
self.block_hashes
.insert(*block_hash, block.to_owned())

@ -95,7 +95,7 @@ impl AuthorizedRequest {
let call_data = params.data.map(|x| format!("{}", x));
let rl = revert_logs::ActiveModel {
user_key_id: sea_orm::Set(authorized_request.user_key_id),
rpc_key_id: sea_orm::Set(authorized_request.rpc_key_id),
method: sea_orm::Set(method),
to: sea_orm::Set(to),
call_data: sea_orm::Set(call_data),

@ -4,7 +4,7 @@ use axum::{
TypedHeader,
};
use chrono::NaiveDateTime;
use entities::{rpc_accounting, user_keys};
use entities::{rpc_accounting, rpc_keys};
use hashbrown::HashMap;
use migration::Expr;
use num::Zero;
@ -36,7 +36,7 @@ async fn get_user_id_from_params(
.get::<_, u64>(bearer_cache_key)
.await
// TODO: this should be a 403
.context("fetching user_key_id from redis with bearer_cache_key")
.context("fetching rpc_key_id from redis with bearer_cache_key")
}
(_, None) => {
// they have a bearer token. we don't care about it on public pages
@ -53,15 +53,15 @@ async fn get_user_id_from_params(
}
}
/// only allow user_key to be set if user_id is also set.
/// only allow rpc_key to be set if user_id is also set.
/// this will keep people from reading someone else's keys.
/// 0 means none.
pub fn get_user_key_id_from_params(
pub fn get_rpc_key_id_from_params(
user_id: u64,
params: &HashMap<String, String>,
) -> anyhow::Result<u64> {
if user_id > 0 {
params.get("user_key_id").map_or_else(
params.get("rpc_key_id").map_or_else(
|| Ok(0),
|c| {
let c = c.parse()?;
@ -266,21 +266,18 @@ pub async fn get_aggregate_rpc_stats_from_params(
// TODO: are these joins correct?
// TODO: what about keys where they are the secondary users?
let q = q
.join(
JoinType::InnerJoin,
rpc_accounting::Relation::UserKeys.def(),
)
.column(user_keys::Column::UserId)
.group_by(user_keys::Column::UserId);
.join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKeys.def())
.column(rpc_keys::Column::UserId)
.group_by(rpc_keys::Column::UserId);
let condition = condition.add(user_keys::Column::UserId.eq(user_id));
let condition = condition.add(rpc_keys::Column::UserId.eq(user_id));
(condition, q)
};
let q = q.filter(condition);
// TODO: enum between searching on user_key_id on user_id
// TODO: enum between searching on rpc_key_id on user_id
// TODO: handle secondary users, too
// log query here. i think sea orm has a useful log level for this
@ -306,7 +303,7 @@ pub async fn get_detailed_stats(
let redis_conn = app.redis_conn().await.context("connecting to redis")?;
let user_id = get_user_id_from_params(redis_conn, bearer, &params).await?;
let user_key_id = get_user_key_id_from_params(user_id, &params)?;
let rpc_key_id = get_rpc_key_id_from_params(user_id, &params)?;
let chain_id = get_chain_id_from_params(app, &params)?;
let query_start = get_query_start_from_params(&params)?;
let query_window_seconds = get_query_window_seconds_from_params(&params)?;
@ -390,24 +387,21 @@ pub async fn get_detailed_stats(
// TODO: move authentication here?
// TODO: what about keys where this user is a secondary user?
let q = q
.join(
JoinType::InnerJoin,
rpc_accounting::Relation::UserKeys.def(),
)
.column(user_keys::Column::UserId)
.group_by(user_keys::Column::UserId);
.join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKeys.def())
.column(rpc_keys::Column::UserId)
.group_by(rpc_keys::Column::UserId);
let condition = condition.add(user_keys::Column::UserId.eq(user_id));
let condition = condition.add(rpc_keys::Column::UserId.eq(user_id));
let q = if user_key_id == 0 {
q.column(user_keys::Column::UserId)
.group_by(user_keys::Column::UserId)
let q = if rpc_key_id == 0 {
q.column(rpc_keys::Column::UserId)
.group_by(rpc_keys::Column::UserId)
} else {
response.insert("user_key_id", serde_json::to_value(user_key_id)?);
response.insert("rpc_key_id", serde_json::to_value(rpc_key_id)?);
// no need to group_by user_id when we are grouping by key_id
q.column(user_keys::Column::Id)
.group_by(user_keys::Column::Id)
q.column(rpc_keys::Column::Id)
.group_by(rpc_keys::Column::Id)
};
(condition, q)