setup database and stub migrations

This commit is contained in:
Bryan Stitt 2022-07-26 04:53:38 +00:00
parent f762f7bd10
commit 4cb65b0fa4
26 changed files with 1122 additions and 148 deletions

1
.gitignore vendored

@ -1,4 +1,5 @@
/config/*.toml
/data
flamegraph.svg
perf.data
perf.data.old

947
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -1,5 +1,7 @@
[workspace]
members = [
"entities",
"migration",
"linkedhashmap",
"redis-cell-client",
"web3-proxy",

@ -11,6 +11,7 @@ All other requests are sent to an RPC server on the latest block (alchemy, moral
Each server has different limits to configure. The `soft_limit` is the number of parallel active requests where a server starts to slow down. The `hard_limit` is where a server starts giving rate limits or other errors.
```
$ cargo install sea-orm-cli
$ cargo run --release -- --help
```
```

@ -138,6 +138,7 @@ new endpoints for users:
## V2
- [ ] sea-orm brings in async-std, but we are using tokio. benchmark switching
- [ ] jwt auth so people can easily switch from infura
- [ ] handle log subscriptions
- [ ] most things that are cached locally should probably be in shared redis caches

@ -1,5 +1,6 @@
[shared]
chain_id = 1
db_url = "mysql://root:dev_web3_proxy@dev-mysql:3306/dev_web3_proxy"
redis_url = "redis://dev-redis:6379/"
public_rate_limit_per_minute = 60_000
# 1GB of cache

@ -1,5 +0,0 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/schema.rs"

@ -11,18 +11,18 @@ services:
dev-db:
image: mysql
# NOTE: use of "mysql_native_password" is not recommended: https://dev.mysql.com/doc/refman/8.0/en/upgrading-from-previous-series.html#upgrade-caching-sha2-password
# (this is just an example, not intended to be a production configuration)
command: --default-authentication-plugin=mysql_native_password
environment:
MYSQL_ROOT_PASSWORD: dev_web3_proxy
MYSQL_DATABASE: dev_web3_proxy
ports:
- 127.0.0.1:3306:3306
volumes:
- ./data/dev_mysql:/var/lib/mysql
dev-adminer:
image: adminer
ports:
- 127.0.0.1:8306:8080
- 8306:8080
environment:
ADMINER_DEFAULT_SERVER: dev-db

8
entities/Cargo.toml Normal file

@ -0,0 +1,8 @@
[package]
name = "entities"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

3
entities/src/main.rs Normal file

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}

21
migration/Cargo.toml Normal file

@ -0,0 +1,21 @@
[package]
name = "migration"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "migration"
path = "src/lib.rs"
[dependencies]
async-std = { version = "^1", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration]
version = "^0.9.0"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
"runtime-tokio-rustls", # `ASYNC_RUNTIME` featrure
"sqlx-mysql", # `DATABASE_DRIVER` feature
]

41
migration/README.md Normal file

@ -0,0 +1,41 @@
# Running Migrator CLI
- Generate a new migration file
```sh
cargo run -- migrate generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```

45
migration/first_draft.sql Normal file

@ -0,0 +1,45 @@
CREATE TABLE users (
id SERIAL PRIMARY KEY,
chain INT,
primary_address VARCHAR(42),
description VARCHAR(255),
email VARCHAR(320),
)
-- TODO: foreign key
-- TODO: how should we store addresses?
-- TODO: creation time?
-- TODO: permissions. likely similar to infura
CREATE TABLE secondary_users (
id SERIAL PRIMARY KEY,
users_id BIGINT,
secondary_address VARCHAR(42),
chain INT,
description VARCHAR,
email VARCHAR(320),
)
-- TODO: creation time?
CREATE TABLE blocklist (
id SERIAL PRIMARY KEY,
blocked_address VARCHAR,
chain INT,
reason TEXT,
)
-- TODO: foreign key
-- TODO: index on api_key
-- TODO: what size for api_key
-- TODO: track active with a timestamp?
-- TODO: creation time?
-- TODO: requests_per_second INT,
-- TODO: requests_per_day INT,
-- TODO: more security features. likely similar to infura
CREATE TABLE user_keys (
id SERIAL PRIMARY KEY,
users_id BIGINT,
api_key VARCHAR,
description VARCHAR,
private_txs BOOLEAN,
active BOOLEAN,
)

12
migration/src/lib.rs Normal file

@ -0,0 +1,12 @@
pub use sea_orm_migration::prelude::*;
mod m20220101_000001_create_table;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20220101_000001_create_table::Migration)]
}
}

@ -0,0 +1,48 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
todo!();
manager
.create_table(
Table::create()
.table(Post::Table)
.if_not_exists()
.col(
ColumnDef::new(Post::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Post::Title).string().not_null())
.col(ColumnDef::new(Post::Text).string().not_null())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
todo!();
manager
.drop_table(Table::drop().table(Post::Table).to_owned())
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum Post {
Table,
Id,
Title,
Text,
}

6
migration/src/main.rs Normal file

@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}

@ -9,8 +9,13 @@ default-run = "web3-proxy"
[features]
default = ["deadlock_detection"]
deadlock_detection = ["parking_lot/deadlock_detection"]
verbose = ["sea-orm/debug-print"]
[dependencies]
redis-cell-client = { path = "../redis-cell-client" }
entities = { path = "../entities" }
migration = { path = "../migration" }
anyhow = { version = "1.0.58", features = ["backtrace"] }
arc-swap = "1.5.0"
argh = "0.1.8"
@ -19,7 +24,6 @@ axum-client-ip = "0.2.0"
counter = "0.5.6"
dashmap = "5.3.4"
derive_more = "0.99.17"
diesel_migrations = "1.4.0"
dotenv = "0.15.0"
ethers = { version = "0.15.0", features = ["rustls", "ws"] }
fdlimit = "0.2.1"
@ -30,7 +34,6 @@ indexmap = "1.9.1"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
notify = "4.0.17"
num = "0.4.0"
redis-cell-client = { path = "../redis-cell-client" }
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
petgraph = "0.6.2"
proctitle = "0.1.1"
@ -39,9 +42,11 @@ regex = "1.6.0"
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
rustc-hash = "1.1.0"
# siwe = "0.4.0" # blocked by https://github.com/spruceid/siwe-rs/issues/36
sea-orm = { version = "0.9.1", features = [ "macros" ] }
serde = { version = "1.0.140", features = [] }
serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.20.1", features = ["full", "tracing"] }
async-std = { version = "^1", features = ["attributes", "tokio1"] }
toml = "0.5.9"
tracing = "0.1.35"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use

@ -1,5 +0,0 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/schema.rs"

@ -1,4 +0,0 @@
DROP TABLE users;
DROP TABLE secondary_users;
DROP TABLE blocklist;
DROP TABLE user_keys;

@ -1,43 +0,0 @@
CREATE TABLE users (
id SERIAL PRIMARY KEY,
primary_address VARCHAR NOT NULL,
chain INT NOT NULL,
description VARCHAR,
email VARCHAR DEFAULT NULL,
)
CREATE TABLE secondary_users (
id SERIAL PRIMARY KEY,
-- TODO: foreign key
users_id BIGINT,
-- TODO: how should we store addresses?
secondary_address VARCHAR NOT NULL,
chain INT NOT NULL,
description VARCHAR,
-- TODO: creation time?
-- TODO: permissions. likely similar to infura
)
CREATE TABLE blocklist (
id SERIAL PRIMARY KEY,
-- TODO: creation time?
blocked_address VARCHAR NOT NULL,
chain INT NOT NULL,
reason TEXT,
)
CREATE TABLE user_keys (
id SERIAL PRIMARY KEY,
-- TODO: foreign key
users_id BIGINT,
-- TODO: index on api_key
api_key VARCHAR NOT NULL,
description VARCHAR,
private_txs BOOLEAN,
-- TODO: track active with a timestamp?
active BOOLEAN,
-- TODO: creation time?
-- TODO: requests_per_second INT,
-- TODO: requests_per_day INT,
-- TODO: more security features. likely similar to infura
)

@ -1,7 +1,7 @@
use anyhow::Context;
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use diesel_migrations::embed_migrations;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, BlockNumber, Bytes, Transaction, TxHash, H256, U64};
use futures::future::Abortable;
@ -36,8 +36,6 @@ use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
embed_migrations!("../migrations/");
// TODO: make this customizable?
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
@ -264,6 +262,7 @@ pub struct Web3ProxyApp {
pending_tx_sender: broadcast::Sender<TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
public_rate_limiter: Option<RedisCellClient>,
db_conn: Option<sea_orm::DatabaseConnection>,
}
impl fmt::Debug for Web3ProxyApp {
@ -295,6 +294,27 @@ impl Web3ProxyApp {
// let connection = db_pool.get().await;
// embedded_migrations::run_with_output(&connection, &mut std::io::stdout());
let db_conn = if let Some(db_url) = app_config.shared.db_url {
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
// TODO: load all these options from the config file
db_opt
.max_connections(100)
.min_connections(num_workers.try_into()?)
.connect_timeout(Duration::from_secs(8))
.idle_timeout(Duration::from_secs(8))
.max_lifetime(Duration::from_secs(60))
.sqlx_logging(true);
// .sqlx_logging_level(log::LevelFilter::Info);
let db_conn = sea_orm::Database::connect(db_opt).await?;
Some(db_conn)
} else {
info!("no database");
None
};
let balanced_rpcs = app_config.balanced_rpcs.into_values().collect();
let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs {
@ -318,14 +338,13 @@ impl Web3ProxyApp {
);
let redis_client_pool = match app_config.shared.redis_url {
Some(redis_address) => {
info!("Connecting to redis on {}", redis_address);
Some(redis_url) => {
info!("Connecting to redis on {}", redis_url);
let manager = RedisConnectionManager::new(redis_address)?;
let manager = RedisConnectionManager::new(redis_url)?;
let min_size = num_workers as u32;
let max_size = min_size * 4;
// TODO: min_idle?
// TODO: set max_size based on max expected concurrent connections? set based on num_workers?
let builder = bb8::Pool::builder()
@ -338,14 +357,17 @@ impl Web3ProxyApp {
Some(pool)
}
None => {
warn!("No redis address");
warn!("no redis connection");
None
}
};
let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16);
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop(pending_tx_receiver);
// TODO: this will grow unbounded!! add some expiration to this. and probably move to redis
let pending_transactions = Arc::new(DashMap::new());
@ -364,7 +386,8 @@ impl Web3ProxyApp {
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
)
.await?;
.await
.context("balanced rpcs")?;
handles.push(balanced_handle);
@ -384,16 +407,14 @@ impl Web3ProxyApp {
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
)
.await?;
.await
.context("private_rpcs")?;
handles.push(private_handle);
private_rpcs
};
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop(pending_tx_receiver);
// TODO: how much should we allow?
let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3;
@ -421,6 +442,7 @@ impl Web3ProxyApp {
pending_tx_sender,
pending_transactions,
public_rate_limiter,
db_conn,
};
let app = Arc::new(app);

@ -1,3 +1,5 @@
// TODO: move this into redis-cell-client
use redis_cell_client::bb8;
use tracing::warn;

@ -39,6 +39,7 @@ pub struct AppConfig {
pub struct RpcSharedConfig {
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
pub chain_id: u64,
pub db_url: Option<String>,
pub redis_url: Option<String>,
#[serde(default = "default_public_rate_limit_per_minute")]
pub public_rate_limit_per_minute: u32,
@ -79,7 +80,7 @@ impl Web3ConnectionConfig {
(None, None) => None,
(Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)),
(None, Some(_)) => None,
(Some(hard_limit), None) => {
(Some(_hard_limit), None) => {
return Err(anyhow::anyhow!(
"no redis client pool! needed for hard limit"
))

@ -517,6 +517,8 @@ impl Web3Connections {
}
pub fn has_synced_rpcs(&self) -> bool {
// TODO: require a minimum number of synced rpcs
// TODO: move this whole function to SyncedConnections
if self.synced_connections.load().conns.is_empty() {
return false;
}

@ -174,6 +174,7 @@ mod tests {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_test_writer()
.init();
let anvil = Anvil::new().spawn();
@ -200,6 +201,7 @@ mod tests {
let app_config = AppConfig {
shared: RpcSharedConfig {
chain_id: 31337,
db_url: None,
redis_url: None,
public_rate_limit_per_minute: 0,
response_cache_max_bytes: 10_usize.pow(7),