move pending logins to the database

This commit is contained in:
Bryan Stitt 2022-12-13 18:13:23 -08:00
parent 01faf8cf35
commit 081873e937
28 changed files with 541 additions and 173 deletions

86
Cargo.lock generated

@ -343,9 +343,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.17"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43"
checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48"
dependencies = [
"async-trait",
"axum-core",
@ -363,8 +363,10 @@ dependencies = [
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sha-1",
"sync_wrapper",
@ -378,9 +380,9 @@ dependencies = [
[[package]]
name = "axum-client-ip"
version = "0.2.0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ba6eab8967a7b1121e3cd7491a205dff7a33bb05e65df6c199d687a32e4d3b"
checksum = "a27e888320b2a506263b84ef2f147a68bec582e49b7c6ab8246174aa196e18c8"
dependencies = [
"axum",
"forwarded-header-value",
@ -388,9 +390,9 @@ dependencies = [
[[package]]
name = "axum-core"
version = "0.2.9"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc"
checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
dependencies = [
"async-trait",
"bytes",
@ -398,15 +400,16 @@ dependencies = [
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-macros"
version = "0.2.3"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6293dae2ec708e679da6736e857cf8532886ef258e92930f38279c12641628b8"
checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621"
dependencies = [
"heck 0.4.0",
"proc-macro2",
@ -1106,9 +1109,9 @@ dependencies = [
[[package]]
name = "deadpool-redis"
version = "0.11.0"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cb2945e05eeebc0f99795a135b2b15c038646d8292946621840d3cbff296e20"
checksum = "2b8bde44cbfdf17ae5baa45c9f43073b320f1a19955389315629304a23909ad2"
dependencies = [
"deadpool",
"redis",
@ -1347,10 +1350,12 @@ dependencies = [
[[package]]
name = "entities"
version = "0.11.0"
version = "0.12.0"
dependencies = [
"ethers",
"sea-orm",
"serde",
"ulid",
"uuid 1.2.2",
]
@ -1502,6 +1507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "002a0d58a7d921b496f5f19b5b9508d01d25fbe25078286b1fcb6f4e7562acf7"
dependencies = [
"ethers-contract-abigen",
"ethers-contract-derive",
"ethers-core",
"ethers-providers",
"futures-util",
@ -1537,6 +1543,21 @@ dependencies = [
"walkdir",
]
[[package]]
name = "ethers-contract-derive"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f192e8e4cf2b038318aae01e94e7644e0659a76219e94bcd3203df744341d61f"
dependencies = [
"ethers-contract-abigen",
"ethers-core",
"hex",
"proc-macro2",
"quote",
"serde_json",
"syn",
]
[[package]]
name = "ethers-core"
version = "1.0.0"
@ -2461,9 +2482,9 @@ dependencies = [
[[package]]
name = "ipnet"
version = "2.5.1"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f88c5561171189e69df9d98bcf18fd5f9558300f7ea7b801eb8a0fd748bd8745"
checksum = "11b0d96e660696543b251e58030cf9787df56da39dab19ad60eae7353040917e"
[[package]]
name = "iri-string"
@ -2660,9 +2681,9 @@ dependencies = [
[[package]]
name = "matchit"
version = "0.5.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "md-5"
@ -2720,7 +2741,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.11.0"
version = "0.12.0"
dependencies = [
"sea-orm-migration",
"tokio",
@ -3865,9 +3886,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.6"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f"
checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
[[package]]
name = "ryu"
@ -4252,9 +4273,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.149"
version = "1.0.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055"
checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
dependencies = [
"serde_derive",
]
@ -4271,9 +4292,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.149"
version = "1.0.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4"
checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
dependencies = [
"proc-macro2",
"quote",
@ -4291,6 +4312,15 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "184c643044780f7ceb59104cef98a5a6f12cb2288a7bc701ab93a362b49fd47d"
dependencies = [
"serde",
]
[[package]]
name = "serde_prometheus"
version = "0.1.6"
@ -4953,9 +4983,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.22.0"
version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3"
checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
dependencies = [
"autocfg",
"bytes",
@ -4969,7 +4999,7 @@ dependencies = [
"socket2",
"tokio-macros",
"tracing",
"winapi",
"windows-sys 0.42.0",
]
[[package]]
@ -5082,9 +5112,9 @@ dependencies = [
[[package]]
name = "tower-layer"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
@ -5499,7 +5529,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.11.0"
version = "0.12.0"
dependencies = [
"anyhow",
"arc-swap",

@ -11,4 +11,4 @@ anyhow = "1.0.66"
hashbrown = "0.13.1"
log = "0.4.17"
moka = { version = "0.9.6", default-features = false, features = ["future"] }
tokio = "1.22.0"
tokio = "1.23.0"

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.11.0"
version = "0.12.0"
edition = "2021"
[lib]
@ -11,5 +11,7 @@ path = "src/mod.rs"
[dependencies]
sea-orm = "0.10.5"
serde = "1.0.149"
serde = "1.0.150"
uuid = "1.2.2"
ethers = "1.0.2"
ulid = "1.0.0"

37
entities/src/login.rs Normal file

@ -0,0 +1,37 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use crate::serialization;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "login")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
pub bearer_token: Uuid,
pub user_id: u64,
pub expires_at: DateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::UserId",
to = "super::user::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
User,
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -1,11 +1,14 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
pub mod prelude;
pub mod login;
pub mod pending_login;
pub mod revert_log;
pub mod rpc_accounting;
pub mod rpc_key;
pub mod sea_orm_active_enums;
pub mod secondary_user;
pub mod serialization;
pub mod user;
pub mod user_tier;

@ -0,0 +1,23 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use crate::serialization;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "pending_login")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
pub nonce: Uuid,
#[sea_orm(column_type = "Text")]
pub message: String,
pub expires_at: DateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

@ -1,5 +1,7 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
pub use super::login::Entity as Login;
pub use super::pending_login::Entity as PendingLogin;
pub use super::revert_log::Entity as RevertLog;
pub use super::rpc_accounting::Entity as RpcAccounting;
pub use super::rpc_key::Entity as RpcKey;

@ -1,6 +1,7 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use super::sea_orm_active_enums::Method;
use crate::serialization;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -12,6 +13,7 @@ pub struct Model {
pub rpc_key_id: u64,
pub timestamp: DateTimeUtc,
pub method: Method,
#[serde(serialize_with = "serialization::vec_as_address")]
pub to: Vec<u8>,
#[sea_orm(column_type = "Text", nullable)]
pub call_data: Option<String>,

@ -1,4 +1,4 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -11,13 +11,10 @@ pub struct Model {
pub rpc_key_id: Option<u64>,
pub chain_id: u64,
pub method: Option<String>,
pub archive_request: bool,
pub error_response: bool,
pub period_datetime: DateTimeUtc,
pub frontend_requests: u64,
pub backend_requests: u64,
// pub backend_retries: u64,
// pub no_servers: u64,
pub cache_misses: u64,
pub cache_hits: u64,
pub sum_request_bytes: u64,
@ -41,6 +38,7 @@ pub struct Model {
pub p90_response_bytes: u64,
pub p99_response_bytes: u64,
pub max_response_bytes: u64,
pub archive_request: bool,
pub origin: Option<String>,
}

@ -1,6 +1,7 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use super::sea_orm_active_enums::LogLevel;
use crate::serialization;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -11,12 +12,11 @@ pub struct Model {
pub id: u64,
pub user_id: u64,
#[sea_orm(unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
pub secret_key: Uuid,
pub description: Option<String>,
pub private_txs: bool,
pub active: bool,
pub log_level: LogLevel,
pub log_revert_chance: f64,
#[sea_orm(column_type = "Text", nullable)]
pub allowed_ips: Option<String>,
#[sea_orm(column_type = "Text", nullable)]
@ -25,16 +25,16 @@ pub struct Model {
pub allowed_referers: Option<String>,
#[sea_orm(column_type = "Text", nullable)]
pub allowed_user_agents: Option<String>,
}
pub enum RpcKeyLogLevels {
None,
Basic,
Detailed,
pub log_revert_chance: f64,
pub log_level: LogLevel,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::revert_log::Entity")]
RevertLog,
#[sea_orm(has_many = "super::rpc_accounting::Entity")]
RpcAccounting,
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::UserId",
@ -43,16 +43,6 @@ pub enum Relation {
on_delete = "NoAction"
)]
User,
#[sea_orm(has_many = "super::revert_log::Entity")]
RevertLog,
#[sea_orm(has_many = "super::rpc_accounting::Entity")]
RpcAccounting,
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl Related<super::revert_log::Entity> for Entity {
@ -67,4 +57,10 @@ impl Related<super::rpc_accounting::Entity> for Entity {
}
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -1,11 +1,10 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "log_level")]
#[strum(ascii_case_insensitive)]
pub enum LogLevel {
#[sea_orm(string_value = "none")]
None,
@ -31,7 +30,6 @@ pub enum Method {
#[sea_orm(string_value = "eth_sendRawTransaction")]
EthSendRawTransaction,
}
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "role")]
pub enum Role {

@ -1,4 +1,4 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use super::sea_orm_active_enums::Role;
use sea_orm::entity::prelude::*;

@ -0,0 +1,30 @@
//! sea-orm types don't always serialize how we want. this helps that, though it won't help every case.
use ethers::prelude::Address;
use sea_orm::prelude::Uuid;
use serde::{Serialize, Serializer};
use std::convert::TryInto;
use ulid::Ulid;
pub fn to_fixed_length<T, const N: usize>(v: Vec<T>) -> [T; N] {
v.try_into()
.unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len()))
}
pub fn vec_as_address<S>(x: &[u8], s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let x = Address::from_slice(x);
x.serialize(s)
}
pub fn uuid_as_ulid<S>(x: &Uuid, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let x = Ulid::from(x.as_u128());
// TODO: to_string shouldn't be needed, but i'm still seeing Uuid length
x.to_string().serialize(s)
}

@ -1,5 +1,6 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use crate::serialization;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -9,6 +10,7 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(unique)]
#[serde(serialize_with = "serialization::vec_as_address")]
pub address: Vec<u8>,
pub description: Option<String>,
pub email: Option<String>,
@ -17,6 +19,12 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::login::Entity")]
Login,
#[sea_orm(has_many = "super::rpc_key::Entity")]
RpcKey,
#[sea_orm(has_many = "super::secondary_user::Entity")]
SecondaryUser,
#[sea_orm(
belongs_to = "super::user_tier::Entity",
from = "Column::UserTierId",
@ -25,15 +33,11 @@ pub enum Relation {
on_delete = "NoAction"
)]
UserTier,
#[sea_orm(has_many = "super::rpc_key::Entity")]
RpcKey,
#[sea_orm(has_many = "super::secondary_user::Entity")]
SecondaryUser,
}
impl Related<super::user_tier::Entity> for Entity {
impl Related<super::login::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserTier.def()
Relation::Login.def()
}
}
@ -49,4 +53,10 @@ impl Related<super::secondary_user::Entity> for Entity {
}
}
impl Related<super::user_tier::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserTier.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -1,4 +1,4 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.11.0"
version = "0.12.0"
edition = "2021"
publish = false
@ -9,7 +9,7 @@ name = "migration"
path = "src/lib.rs"
[dependencies]
tokio = { version = "1.22.0", features = ["full", "tracing"] }
tokio = { version = "1.23.0", features = ["full", "tracing"] }
[dependencies.sea-orm-migration]
version = "0.10.5"

@ -11,6 +11,7 @@ mod m20221031_211916_clean_up;
mod m20221101_222349_archive_request;
mod m20221108_200345_save_anon_stats;
mod m20221211_124002_request_method_privacy;
mod m20221213_134158_move_login_into_database;
pub struct Migrator;
@ -29,6 +30,7 @@ impl MigratorTrait for Migrator {
Box::new(m20221101_222349_archive_request::Migration),
Box::new(m20221108_200345_save_anon_stats::Migration),
Box::new(m20221211_124002_request_method_privacy::Migration),
Box::new(m20221213_134158_move_login_into_database::Migration),
]
}
}

@ -0,0 +1,107 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(PendingLogin::Table)
.col(
ColumnDef::new(PendingLogin::Id)
.big_unsigned()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(PendingLogin::Nonce)
.uuid()
.not_null()
.unique_key(),
)
.col(ColumnDef::new(PendingLogin::Message).text().not_null())
.col(
ColumnDef::new(PendingLogin::ExpiresAt)
.timestamp()
.not_null(),
)
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(Login::Table)
.col(
ColumnDef::new(Login::Id)
.big_unsigned()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(Login::BearerToken)
.uuid()
.not_null()
.unique_key(),
)
.col(ColumnDef::new(Login::UserId).big_unsigned().not_null())
.col(
ColumnDef::new(PendingLogin::ExpiresAt)
.timestamp()
.not_null(),
)
.foreign_key(
ForeignKeyCreateStatement::new()
.from_col(Login::UserId)
.to_tbl(User::Table)
.to_col(User::Id),
)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(PendingLogin::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(Login::Table).to_owned())
.await?;
Ok(())
}
}
/// Partial table definition
#[derive(Iden)]
pub enum User {
Table,
Id,
}
#[derive(Iden)]
enum PendingLogin {
Table,
Id,
Nonce,
Message,
ExpiresAt,
}
#[derive(Iden)]
enum Login {
Table,
Id,
BearerToken,
UserId,
}

@ -6,5 +6,5 @@ edition = "2021"
[dependencies]
anyhow = "1.0.66"
deadpool-redis = { version = "0.11.0", features = ["rt_tokio_1", "serde"] }
tokio = "1.22.0"
deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] }
tokio = "1.23.0"

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.11.0"
version = "0.12.0"
edition = "2021"
default-run = "web3_proxy"
@ -22,9 +22,9 @@ thread-fast-rng = { path = "../thread-fast-rng" }
anyhow = { version = "1.0.66", features = ["backtrace"] }
arc-swap = "1.5.1"
argh = "0.1.9"
axum = { version = "0.5.17", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] }
axum-client-ip = "0.2.0"
axum-macros = "0.2.3"
axum = { version = "0.6.1", features = ["headers", "ws"] }
axum-client-ip = "0.3.0"
axum-macros = "0.3.0"
# TODO: import chrono from sea-orm so we always have the same version
chrono = "0.4.23"
counter = "0.5.7"
@ -38,7 +38,7 @@ futures = { version = "0.3.25", features = ["thread-pool"] }
hashbrown = { version = "0.13.1", features = ["serde"] }
hdrhistogram = "7.5.2"
http = "0.2.8"
ipnet = "2.5.1"
ipnet = "2.7.0"
log = "0.4.17"
metered = { version = "0.9.0", features = ["serialize"] }
moka = { version = "0.9.6", default-features = false, features = ["future"] }
@ -55,12 +55,12 @@ handlebars = "4.3.5"
rustc-hash = "1.1.0"
siwe = "0.5.0"
sentry = { version = "0.29.1", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.149", features = [] }
serde = { version = "1.0.150", features = [] }
serde_json = { version = "1.0.89", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.1.6"
# TODO: make sure this time version matches siwe. PR to put this in their prelude
time = "0.3.17"
tokio = { version = "1.22.0", features = ["full"] }
tokio = { version = "1.23.0", features = ["full"] }
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
tokio-stream = { version = "0.1.11", features = ["sync"] }
toml = "0.5.9"

@ -13,6 +13,7 @@ use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimiter;
@ -123,7 +124,7 @@ pub struct Web3ProxyApp {
Cache<NonZeroU64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores:
Cache<String, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub stat_sender: Option<flume::Sender<Web3ProxyStat>>,
}

@ -8,13 +8,12 @@ use axum::headers::authorization::Bearer;
use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::{rpc_key, user, user_tier};
use entities::{login, rpc_key, user, user_tier};
use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use log::error;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64};
@ -422,11 +421,14 @@ impl Web3ProxyApp {
pub async fn bearer_is_authorized(
&self,
bearer: Bearer,
) -> anyhow::Result<(user::Model, OwnedSemaphorePermit)> {
) -> Result<(user::Model, OwnedSemaphorePermit), FrontendErrorResponse> {
// get the user id for this bearer token
let user_bearer_token = UserBearerToken::try_from(bearer)?;
// limit concurrent requests
let semaphore = self
.bearer_token_semaphores
.get_with(bearer.token().to_string(), async move {
.get_with(user_bearer_token.clone(), async move {
let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
})
@ -434,26 +436,20 @@ impl Web3ProxyApp {
let semaphore_permit = semaphore.acquire_owned().await?;
// get the user id for this bearer token
// TODO: move redis key building to a helper function
let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string();
// get the attached address from the database for the given auth_token.
let db_conn = self
.db_conn()
.context("checking if bearer token is authorized")?;
// get the attached address from redis for the given auth_token.
let mut redis_conn = self.redis_conn().await?;
let user_bearer_uuid: Uuid = user_bearer_token.into();
let user_id: u64 = redis_conn
.get::<_, Option<u64>>(bearer_cache_key)
.await
.context("fetching bearer cache key from redis")?
.context("unknown bearer token")?;
// turn user id into a user
let db_conn = self.db_conn().context("Getting database connection")?;
let user = user::Entity::find_by_id(user_id)
let user = user::Entity::find()
.left_join(login::Entity)
.filter(login::Column::BearerToken.eq(user_bearer_uuid))
.one(&db_conn)
.await
.context("fetching user from db by id")?
.context("unknown user id")?;
.context("fetching user from db by bearer token")?
.context("unknown bearer token")?;
Ok((user, semaphore_permit))
}

@ -16,7 +16,7 @@ use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use std::error::Error;
use tokio::{task::JoinError, time::Instant};
use tokio::{sync::AcquireError, task::JoinError, time::Instant};
// TODO: take "IntoResponse" instead of Response?
pub type FrontendResult = Result<Response, FrontendErrorResponse>;
@ -26,6 +26,7 @@ pub type FrontendResult = Result<Response, FrontendErrorResponse>;
pub enum FrontendErrorResponse {
AccessDenied,
Anyhow(anyhow::Error),
SemaphoreAcquireError(AcquireError),
Box(Box<dyn Error>),
Database(DbErr),
HeadersError(headers::Error),
@ -199,6 +200,18 @@ impl IntoResponse for FrontendErrorResponse {
debug_assert_ne!(r.status(), StatusCode::OK);
return r;
}
Self::SemaphoreAcquireError(err) => {
warn!("semaphore acquire err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_string(
// TODO: is it safe to expose all of our anyhow strings?
"semaphore acquire error".to_string(),
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::StatusCode(status_code, err_msg, err) => {
// different status codes should get different error levels. 500s should warn. 400s should stat
let code = status_code.as_u16();

@ -10,7 +10,6 @@ pub mod users;
use crate::app::Web3ProxyApp;
use axum::{
handler::Handler,
routing::{get, post, put},
Extension, Router,
};
@ -93,7 +92,7 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
// frontend caches
.layer(Extension(response_cache))
// 404 for any unknown routes
.fallback(errors::handler_404.into_service());
.fallback(errors::handler_404);
// run our app with hyper
// TODO: allow only listening on localhost? top_config.app.host.parse()?

@ -15,7 +15,6 @@ use std::sync::Arc;
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
/// If possible, please use a WebSocket instead.
#[debug_handler]
pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -42,15 +41,14 @@ pub async fn proxy_web3_rpc(
/// Can optionally authorized based on origin, referer, or user agent.
/// If possible, please use a WebSocket instead.
#[debug_handler]
pub async fn proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
Json(payload): Json<JsonRpcRequestEnum>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
let rpc_key = rpc_key.parse()?;
@ -69,10 +67,7 @@ pub async fn proxy_web3_rpc_with_key(
// the request can take a while, so we spawn so that we can start serving another request
// TODO: spawn even earlier?
let f = tokio::spawn(async move {
app.proxy_web3_rpc(authorization, payload)
.await
});
let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await });
// if this is an error, we are likely shutting down
let response = f.await??;

@ -18,19 +18,20 @@ use axum::{
};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use chrono::{TimeZone, Utc};
use entities::sea_orm_active_enums::LogLevel;
use entities::{revert_log, rpc_key, user};
use entities::{login, pending_login, revert_log, rpc_key, user};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::{HeaderValue, StatusCode};
use ipnet::IpNet;
use itertools::Itertools;
use log::warn;
use migration::sea_orm::prelude::Uuid;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
QueryOrder, TransactionTrait, TryIntoModel,
};
use redis_rate_limiter::redis::AsyncCommands;
use serde::Deserialize;
use serde_json::json;
use siwe::{Message, VerificationOpts};
@ -84,7 +85,7 @@ pub async fn user_login_get(
.context("impossible")?
.parse()
// TODO: map_err so this becomes a 401
.context("bad input")?;
.context("unable to parse address")?;
let login_domain = app
.config
@ -95,6 +96,7 @@ pub async fn user_login_get(
// TODO: get most of these from the app config
let message = Message {
// TODO: don't unwrap
// TODO: accept a login_domain from the request?
domain: login_domain.parse().unwrap(),
address: user_address.to_fixed_bytes(),
// TODO: config for statement
@ -111,16 +113,28 @@ pub async fn user_login_get(
resources: vec![],
};
// TODO: if no redis server, store in local cache? at least give a better error. right now this seems to be a 502
// the address isn't enough. we need to save the actual message so we can read the nonce
// TODO: what message format is the most efficient to store in redis? probably eip191_bytes
// we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time
// TODO: store a maximum number of attempted logins? anyone can request so we don't want to allow DOS attacks
let session_key = format!("login_nonce:{}", nonce);
app.redis_conn()
.await?
.set_ex(session_key, message.to_string(), expire_seconds + 1)
.await?;
let db_conn = app.db_conn().context("login requires a database")?;
// massage types to fit in the database. sea-orm does not make this very elegant
let uuid = Uuid::from_u128(nonce.into());
// we add 1 to expire_seconds just to be sure the database has the key for the full expiration_time
let expires_at = Utc
.timestamp_opt(expiration_time.unix_timestamp() + 1, 0)
.unwrap();
// we do not store a maximum number of attempted logins. anyone can request so we don't want to allow DOS attacks
// add a row to the database for this user
let user_pending_login = pending_login::ActiveModel {
id: sea_orm::NotSet,
nonce: sea_orm::Set(uuid),
message: sea_orm::Set(message.to_string()),
expires_at: sea_orm::Set(expires_at),
};
user_pending_login
.save(&db_conn)
.await
.context("saving user's pending_login")?;
// there are multiple ways to sign messages and not all wallets support them
// TODO: default message eip from config?
@ -163,12 +177,11 @@ pub struct PostLogin {
/// It is recommended to save the returned bearer token in a cookie.
/// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile.
#[debug_handler]
pub async fn user_login_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
Json(payload): Json<PostLogin>,
Query(query): Query<PostLoginQuery>,
Json(payload): Json<PostLogin>,
) -> FrontendResult {
login_is_authorized(&app, ip).await?;
@ -200,16 +213,25 @@ pub async fn user_login_post(
// the only part of the message we will trust is their nonce
// TODO: this is fragile. have a helper function/struct for redis keys
let login_nonce_key = format!("login_nonce:{}", &their_msg.nonce);
let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?;
// fetch the message we gave them from our redis
let mut redis_conn = app.redis_conn().await?;
// fetch the message we gave them from our database
let db_conn = app.db_conn().context("Getting database connection")?;
let our_msg: Option<String> = redis_conn.get(&login_nonce_key).await?;
// massage type for the db
let login_nonce_uuid: Uuid = login_nonce.clone().into();
let our_msg: String = our_msg.context("login nonce not found")?;
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(login_nonce_uuid))
.one(&db_conn)
.await
.context("database error while finding pending_login")?
.context("login nonce not found")?;
let our_msg: siwe::Message = our_msg.parse().context("parsing siwe message")?;
let our_msg: siwe::Message = user_pending_login
.message
.parse()
.context("parsing siwe message")?;
// default options are fine. the message includes timestamp and domain and nonce
let verify_config = VerificationOpts::default();
@ -234,8 +256,6 @@ pub async fn user_login_post(
}
}
let db_conn = app.db_conn().context("Getting database connection")?;
// TODO: limit columns or load whole user?
let u = user::Entity::find()
.filter(user::Column::Address.eq(our_msg.address.as_ref()))
@ -305,7 +325,7 @@ pub async fn user_login_post(
};
// create a bearer token for the user.
let bearer_token = Ulid::new();
let user_bearer_token = UserBearerToken::default();
// json response with everything in it
// we could return just the bearer token, but I think they will always request api keys and the user profile
@ -314,27 +334,37 @@ pub async fn user_login_post(
.into_iter()
.map(|uk| (uk.id, uk))
.collect::<HashMap<_, _>>(),
"bearer_token": bearer_token,
"bearer_token": user_bearer_token,
"user": u,
});
let response = (status_code, Json(response_json)).into_response();
// add bearer to redis
// TODO: use a helper function/struct for this
let bearer_redis_key = UserBearerToken(bearer_token).to_string();
// add bearer to the database
// expire in 4 weeks
// TODO: get expiration time from app config
redis_conn
.set_ex(bearer_redis_key, u.id.to_string(), 2_419_200)
.await?;
let expires_at = Utc::now()
.checked_add_signed(chrono::Duration::weeks(4))
.unwrap();
if let Err(err) = redis_conn.del::<_, u64>(&login_nonce_key).await {
warn!(
"Failed to delete login_nonce_key {}: {}",
login_nonce_key, err
);
let user_login = login::ActiveModel {
id: sea_orm::NotSet,
bearer_token: sea_orm::Set(user_bearer_token.uuid()),
user_id: sea_orm::Set(u.id),
expires_at: sea_orm::Set(expires_at),
};
user_login
.save(&db_conn)
.await
.context("saving user login")?;
if let Err(err) = user_pending_login
.into_active_model()
.delete(&db_conn)
.await
{
warn!("Failed to delete nonce:{}: {}", login_nonce.0, err);
}
Ok(response)
@ -342,17 +372,21 @@ pub async fn user_login_post(
/// `POST /user/logout` - Forget the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_logout_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let mut redis_conn = app.redis_conn().await?;
let user_bearer = UserBearerToken::try_from(bearer)?;
// TODO: i don't like this. move this to a helper function so it is less fragile
let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string();
let db_conn = app.db_conn().context("database needed for user logout")?;
redis_conn.del(bearer_cache_key).await?;
if let Err(err) = login::Entity::delete_many()
.filter(login::Column::BearerToken.eq(user_bearer.uuid()))
.exec(&db_conn)
.await
{
warn!("Failed to delete {}: {}", user_bearer.redis_key(), err);
}
// TODO: what should the response be? probably json something
Ok("goodbye".into_response())

@ -5,20 +5,24 @@ use axum::{
headers::{authorization::Bearer, Authorization},
TypedHeader,
};
use chrono::NaiveDateTime;
use entities::{rpc_accounting, rpc_key};
use chrono::{NaiveDateTime, Utc};
use entities::{login, rpc_accounting, rpc_key};
use hashbrown::HashMap;
use http::StatusCode;
use log::{debug, warn};
use migration::sea_orm::{
ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select,
ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, Select,
};
use migration::{Condition, Expr, SimpleExpr};
use redis_rate_limiter::{redis::AsyncCommands, RedisConnection};
/// get the attached address from redis for the given auth_token.
/// get the attached address for the given bearer token.
/// First checks redis. Then checks the database.
/// 0 means all users
pub async fn get_user_id_from_params(
mut redis_conn: RedisConnection,
db_conn: DatabaseConnection,
// this is a long type. should we strip it down?
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &HashMap<String, String>,
@ -26,22 +30,70 @@ pub async fn get_user_id_from_params(
match (bearer, params.get("user_id")) {
(Some(TypedHeader(Authorization(bearer))), Some(user_id)) => {
// check for the bearer cache key
let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string();
let user_bearer_token = UserBearerToken::try_from(bearer)?;
let user_redis_key = user_bearer_token.redis_key();
let mut save_to_redis = false;
// get the user id that is attached to this bearer token
let bearer_user_id = redis_conn
.get::<_, u64>(bearer_cache_key)
.await
// TODO: this should be a 403
.context("fetching rpc_key_id from redis with bearer_cache_key")?;
let bearer_user_id = match redis_conn.get::<_, u64>(&user_redis_key).await {
Err(_) => {
// TODO: inspect the redis error? if redis is down we should warn
let user_login = login::Entity::find()
.filter(login::Column::BearerToken.eq(user_bearer_token.uuid()))
.one(&db_conn)
.await
.context("database error while querying for user")?
.ok_or(FrontendErrorResponse::AccessDenied)?;
// check expiration. if expired, delete ALL expired pending_logins
let now = Utc::now();
if now > user_login.expires_at {
// this row is expired! do not allow auth!
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.await?;
debug!("cleared expired pending_logins: {:?}", delete_result);
return Err(FrontendErrorResponse::AccessDenied);
}
save_to_redis = true;
user_login.user_id
}
Ok(x) => {
// TODO: save it to redis again to extend the key?
x
}
};
let user_id: u64 = user_id.parse().context("Parsing user_id param")?;
if bearer_user_id != user_id {
Err(FrontendErrorResponse::AccessDenied)
} else {
Ok(bearer_user_id)
return Err(FrontendErrorResponse::AccessDenied);
}
if save_to_redis {
// TODO: how long? we store in database for 4 weeks
let one_day = 60 * 60 * 24;
if let Err(err) = redis_conn
.set_ex::<_, _, ()>(user_redis_key, user_id, one_day)
.await
{
warn!("Unable to save user bearer token to redis: {}", err)
}
}
Ok(bearer_user_id)
}
(_, None) => {
// they have a bearer token. we don't care about it on public pages
@ -280,7 +332,7 @@ pub async fn query_user_stats<'a>(
// get_user_id_from_params checks that the bearer is connected to this user_id
// TODO: match on user_id and rpc_key_id?
let user_id = get_user_id_from_params(redis_conn, bearer, params).await?;
let user_id = get_user_id_from_params(redis_conn, db_conn.clone(), bearer, params).await?;
let (condition, q) = if user_id == 0 {
// 0 means everyone. don't filter on user
// TODO: 0 or None?

@ -1,9 +1,53 @@
use std::str::FromStr;
use axum::headers::authorization::Bearer;
use migration::sea_orm::prelude::Uuid;
use serde::Serialize;
use ulid::Ulid;
/// Key used for caching the user's login
#[derive(Clone, Hash, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct UserBearerToken(pub Ulid);
impl UserBearerToken {
pub fn redis_key(&self) -> String {
format!("bearer:{}", self.0)
}
pub fn uuid(&self) -> Uuid {
Uuid::from_u128(self.0.into())
}
}
impl Default for UserBearerToken {
fn default() -> Self {
Self(Ulid::new())
}
}
impl FromStr for UserBearerToken {
type Err = ulid::DecodeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let ulid = Ulid::from_str(s)?;
Ok(Self(ulid))
}
}
impl From<Ulid> for UserBearerToken {
fn from(x: Ulid) -> Self {
Self(x)
}
}
impl From<UserBearerToken> for Uuid {
fn from(x: UserBearerToken) -> Self {
x.uuid()
}
}
impl TryFrom<Bearer> for UserBearerToken {
type Error = ulid::DecodeError;
@ -13,9 +57,3 @@ impl TryFrom<Bearer> for UserBearerToken {
Ok(UserBearerToken(u))
}
}
impl ToString for UserBearerToken {
fn to_string(&self) -> String {
format!("bearer:{}", self.0)
}
}