From ea5f791560210c468f6911e3414e82caa5f401dc Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 14 Nov 2022 18:24:52 +0000 Subject: [PATCH] simple lock around database migrations --- Cargo.lock | 1 - TODO.md | 3 ++ web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app.rs | 53 ++++++++++++++++--- web3_proxy/src/app_stats.rs | 2 +- .../src/bin/web3_proxy_cli/create_user.rs | 2 +- web3_proxy/src/config.rs | 2 +- web3_proxy/src/frontend/authorization.rs | 2 +- web3_proxy/src/frontend/errors.rs | 2 +- web3_proxy/src/frontend/users.rs | 6 +-- web3_proxy/src/rpcs/connection.rs | 3 +- web3_proxy/src/rpcs/connections.rs | 2 +- web3_proxy/src/rpcs/request.rs | 18 +++++-- web3_proxy/src/user_queries.rs | 6 +-- 14 files changed, 77 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d69b884..f1e24491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5468,7 +5468,6 @@ dependencies = [ "regex", "reqwest", "rustc-hash", - "sea-orm", "sentry", "serde", "serde_json", diff --git a/TODO.md b/TODO.md index 619dfa04..c81ad1ac 100644 --- a/TODO.md +++ b/TODO.md @@ -236,12 +236,15 @@ These are roughly in order of completition - [x] improve sorting servers by weight. don't force to lower weights, still have a probability that smaller weights might be - [x] flamegraphs show 52% of the time to be in tracing. replace with simpler logging - [x] add optional display name to rpc configs +- [x] add locking around running migrations - [-] add configurable size limits to all the Caches - instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - [ ] add block timestamp to the /status page - [ ] cache the status page for a second - [ ] probably need to turn more sentry log integrations (like anyhow) on! - [ ] tests should use `test-env-log = "0.2.8"` +- [ ] weighted random choice should still prioritize non-archive servers + - maybe shuffle randomly and then sort by (block_limit, random_index)? - [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index d9a412e1..19057bad 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -7,9 +7,8 @@ default-run = "web3_proxy" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["deadlock_detection", "verbose_db"] +default = ["deadlock_detection"] deadlock_detection = ["parking_lot/deadlock_detection"] -verbose_db = ["sea-orm/debug-print"] # TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } @@ -56,7 +55,6 @@ reqwest = { version = "0.11.12", default-features = false, features = ["json", " handlebars = "4.3.5" rustc-hash = "1.1.0" siwe = "0.5.0" -sea-orm = { version = "0.10.2", features = ["macros"] } sentry = { version = "0.28.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.147", features = [] } serde_json = { version = "1.0.87", default-features = false, features = ["alloc", "raw_value"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 5a34b228..8f13ddf3 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -25,12 +25,13 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use hashbrown::HashMap; use ipnet::IpNet; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; -use migration::{Migrator, MigratorTrait}; +use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection}; +use migration::sea_query::table::ColumnDef; +use migration::{Alias, Migrator, MigratorTrait, Table}; use moka::future::Cache; use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; -use sea_orm::DatabaseConnection; use serde::Serialize; use serde_json::json; use std::fmt; @@ -42,7 +43,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use ulid::Ulid; @@ -168,10 +169,48 @@ pub async fn get_migrated_db( .sqlx_logging(false); // .sqlx_logging_level(log::LevelFilter::Info); - let db_conn = sea_orm::Database::connect(db_opt).await?; + let db_conn = Database::connect(db_opt).await?; - // TODO: if error, roll back? - Migrator::up(&db_conn, None).await?; + let db_backend = db_conn.get_database_backend(); + + let migration_lock_table_ref = Alias::new("migration_lock"); + + // TODO: put the timestamp into this? + let create_lock_statment = db_backend.build( + Table::create() + .table(migration_lock_table_ref.clone()) + .col(ColumnDef::new(Alias::new("locked")).boolean().default(true)), + ); + let drop_lock_statment = db_backend.build(Table::drop().table(migration_lock_table_ref)); + + loop { + if Migrator::get_pending_migrations(&db_conn).await?.is_empty() { + info!("no migrations to apply"); + return Ok(db_conn); + } + + // there are migrations to apply + // acquire a lock + if let Err(err) = db_conn.execute(create_lock_statment.clone()).await { + debug!("Unable to acquire lock. err={:?}", err); + + // TODO: exponential backoff with jitter + sleep(Duration::from_secs(1)).await; + + continue; + } + + debug!("migration lock acquired"); + break; + } + + let migration_result = Migrator::up(&db_conn, None).await; + + // drop the distributed lock + db_conn.execute(drop_lock_statment).await?; + + // return if migrations erred + migration_result?; Ok(db_conn) } diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 0510a9bf..a22199c8 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -7,7 +7,7 @@ use entities::rpc_accounting; use hashbrown::HashMap; use hdrhistogram::{Histogram, RecordError}; use log::{error, info}; -use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr}; +use migration::sea_orm::{self, ActiveModelTrait, DatabaseConnection, DbErr}; use std::num::NonZeroU64; use std::sync::atomic::Ordering; use std::sync::Arc; diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index db14eac2..5dbe6d57 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -3,7 +3,7 @@ use argh::FromArgs; use entities::{rpc_key, user}; use ethers::prelude::Address; use log::info; -use sea_orm::{ActiveModelTrait, TransactionTrait}; +use migration::sea_orm::{self, ActiveModelTrait, TransactionTrait}; use ulid::Ulid; use uuid::Uuid; use web3_proxy::frontend::authorization::RpcSecretKey; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 460e77a2..e001b2df 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -6,7 +6,7 @@ use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::TxHash; use hashbrown::HashMap; -use sea_orm::DatabaseConnection; +use migration::sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 5da0c230..eda0cb6f 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -14,9 +14,9 @@ use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; use log::error; +use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use std::fmt::Display; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::{net::IpAddr, str::FromStr, sync::Arc}; diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 6b2fcb08..b72d4d55 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -12,9 +12,9 @@ use derive_more::From; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; use log::warn; +use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; -use sea_orm::DbErr; use std::error::Error; use tokio::{task::JoinError, time::Instant}; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 9ac78824..9377a916 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -25,11 +25,11 @@ use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; use log::warn; -use redis_rate_limiter::redis::AsyncCommands; -use sea_orm::{ - ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, +use migration::sea_orm::{ + self, ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, TransactionTrait, TryIntoModel, }; +use redis_rate_limiter::redis::AsyncCommands; use serde::Deserialize; use serde_json::json; use siwe::{Message, VerificationOpts}; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 1b846190..b181b705 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -10,9 +10,9 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; use log::{debug, error, info, warn, Level}; +use migration::sea_orm::DatabaseConnection; use parking_lot::RwLock; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; -use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -289,6 +289,7 @@ impl Web3Connection { initial_sleep: bool, ) -> anyhow::Result<()> { // there are several crates that have retry helpers, but they all seem more complex than necessary + // TODO: move this backoff logic into a helper function so we can use it when doing database locking let base_ms = 500; let cap_ms = 30_000; let range_multiplier = 3; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index b3544d62..bfe6a837 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -20,9 +20,9 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; use log::{error, info, warn, Level}; +use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; -use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index a7dc6224..314ca655 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -13,8 +13,7 @@ use metered::metered; use metered::HitCount; use metered::ResponseTime; use metered::Throughput; -use sea_orm::ActiveEnum; -use sea_orm::ActiveModelTrait; +use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; use serde_json::json; use std::fmt; use std::sync::atomic::{self, AtomicBool, Ordering}; @@ -276,14 +275,23 @@ impl OpenRequestHandle { RequestErrorHandler::DebugLevel => { // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag if !is_revert { - debug!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err); + debug!( + "bad response from {}! method={} params={:?} err={:?}", + self.conn, method, params, err + ); } } RequestErrorHandler::ErrorLevel => { - error!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err); + error!( + "bad response from {}! method={} params={:?} err={:?}", + self.conn, method, params, err + ); } RequestErrorHandler::WarnLevel => { - warn!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err); + warn!( + "bad response from {}! method={} params={:?} err={:?}", + self.conn, method, params, err + ); } RequestErrorHandler::SaveReverts => { // TODO: do not unwrap! (doesn't matter much since we check method as a string above) diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 55e7cc95..8680183a 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -10,11 +10,11 @@ use entities::{rpc_accounting, rpc_key}; use hashbrown::HashMap; use http::StatusCode; use log::warn; -use migration::{Condition, Expr, SimpleExpr}; -use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; -use sea_orm::{ +use migration::sea_orm::{ ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select, }; +use migration::{Condition, Expr, SimpleExpr}; +use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; /// get the attached address from redis for the given auth_token. /// 0 means all users