simple lock around database migrations

This commit is contained in:
Bryan Stitt 2022-11-14 18:24:52 +00:00
parent c1a10b0342
commit ea5f791560
14 changed files with 77 additions and 29 deletions

1
Cargo.lock generated

@ -5468,7 +5468,6 @@ dependencies = [
"regex",
"reqwest",
"rustc-hash",
"sea-orm",
"sentry",
"serde",
"serde_json",

@ -236,12 +236,15 @@ These are roughly in order of completition
- [x] improve sorting servers by weight. don't force to lower weights, still have a probability that smaller weights might be
- [x] flamegraphs show 52% of the time to be in tracing. replace with simpler logging
- [x] add optional display name to rpc configs
- [x] add locking around running migrations
- [-] add configurable size limits to all the Caches
- instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache
- [ ] add block timestamp to the /status page
- [ ] cache the status page for a second
- [ ] probably need to turn more sentry log integrations (like anyhow) on!
- [ ] tests should use `test-env-log = "0.2.8"`
- [ ] weighted random choice should still prioritize non-archive servers
- maybe shuffle randomly and then sort by (block_limit, random_index)?
- [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!"

@ -7,9 +7,8 @@ default-run = "web3_proxy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["deadlock_detection", "verbose_db"]
default = ["deadlock_detection"]
deadlock_detection = ["parking_lot/deadlock_detection"]
verbose_db = ["sea-orm/debug-print"]
# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" }
@ -56,7 +55,6 @@ reqwest = { version = "0.11.12", default-features = false, features = ["json", "
handlebars = "4.3.5"
rustc-hash = "1.1.0"
siwe = "0.5.0"
sea-orm = { version = "0.10.2", features = ["macros"] }
sentry = { version = "0.28.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.147", features = [] }
serde_json = { version = "1.0.87", default-features = false, features = ["alloc", "raw_value"] }

@ -25,12 +25,13 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use hashbrown::HashMap;
use ipnet::IpNet;
use log::{error, info, warn};
use log::{debug, error, info, warn};
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
use migration::{Migrator, MigratorTrait};
use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use sea_orm::DatabaseConnection;
use serde::Serialize;
use serde_json::json;
use std::fmt;
@ -42,7 +43,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio::time::{sleep, timeout};
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use ulid::Ulid;
@ -168,10 +169,48 @@ pub async fn get_migrated_db(
.sqlx_logging(false);
// .sqlx_logging_level(log::LevelFilter::Info);
let db_conn = sea_orm::Database::connect(db_opt).await?;
let db_conn = Database::connect(db_opt).await?;
// TODO: if error, roll back?
Migrator::up(&db_conn, None).await?;
let db_backend = db_conn.get_database_backend();
let migration_lock_table_ref = Alias::new("migration_lock");
// TODO: put the timestamp into this?
let create_lock_statment = db_backend.build(
Table::create()
.table(migration_lock_table_ref.clone())
.col(ColumnDef::new(Alias::new("locked")).boolean().default(true)),
);
let drop_lock_statment = db_backend.build(Table::drop().table(migration_lock_table_ref));
loop {
if Migrator::get_pending_migrations(&db_conn).await?.is_empty() {
info!("no migrations to apply");
return Ok(db_conn);
}
// there are migrations to apply
// acquire a lock
if let Err(err) = db_conn.execute(create_lock_statment.clone()).await {
debug!("Unable to acquire lock. err={:?}", err);
// TODO: exponential backoff with jitter
sleep(Duration::from_secs(1)).await;
continue;
}
debug!("migration lock acquired");
break;
}
let migration_result = Migrator::up(&db_conn, None).await;
// drop the distributed lock
db_conn.execute(drop_lock_statment).await?;
// return if migrations erred
migration_result?;
Ok(db_conn)
}

@ -7,7 +7,7 @@ use entities::rpc_accounting;
use hashbrown::HashMap;
use hdrhistogram::{Histogram, RecordError};
use log::{error, info};
use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr};
use migration::sea_orm::{self, ActiveModelTrait, DatabaseConnection, DbErr};
use std::num::NonZeroU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

@ -3,7 +3,7 @@ use argh::FromArgs;
use entities::{rpc_key, user};
use ethers::prelude::Address;
use log::info;
use sea_orm::{ActiveModelTrait, TransactionTrait};
use migration::sea_orm::{self, ActiveModelTrait, TransactionTrait};
use ulid::Ulid;
use uuid::Uuid;
use web3_proxy::frontend::authorization::RpcSecretKey;

@ -6,7 +6,7 @@ use argh::FromArgs;
use derive_more::Constructor;
use ethers::prelude::TxHash;
use hashbrown::HashMap;
use sea_orm::DatabaseConnection;
use migration::sea_orm::DatabaseConnection;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;

@ -14,9 +14,9 @@ use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use log::error;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{net::IpAddr, str::FromStr, sync::Arc};

@ -12,9 +12,9 @@ use derive_more::From;
use http::header::InvalidHeaderValue;
use ipnet::AddrParseError;
use log::warn;
use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use sea_orm::DbErr;
use std::error::Error;
use tokio::{task::JoinError, time::Instant};

@ -25,11 +25,11 @@ use http::{HeaderValue, StatusCode};
use ipnet::IpNet;
use itertools::Itertools;
use log::warn;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
TransactionTrait, TryIntoModel,
};
use redis_rate_limiter::redis::AsyncCommands;
use serde::Deserialize;
use serde_json::json;
use siwe::{Message, VerificationOpts};

@ -10,9 +10,9 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all;
use futures::StreamExt;
use log::{debug, error, info, warn, Level};
use migration::sea_orm::DatabaseConnection;
use parking_lot::RwLock;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use sea_orm::DatabaseConnection;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -289,6 +289,7 @@ impl Web3Connection {
initial_sleep: bool,
) -> anyhow::Result<()> {
// there are several crates that have retry helpers, but they all seem more complex than necessary
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
let base_ms = 500;
let cap_ms = 30_000;
let range_multiplier = 3;

@ -20,9 +20,9 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use log::{error, info, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt};
use petgraph::graphmap::DiGraphMap;
use sea_orm::DatabaseConnection;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;

@ -13,8 +13,7 @@ use metered::metered;
use metered::HitCount;
use metered::ResponseTime;
use metered::Throughput;
use sea_orm::ActiveEnum;
use sea_orm::ActiveModelTrait;
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
use std::sync::atomic::{self, AtomicBool, Ordering};
@ -276,14 +275,23 @@ impl OpenRequestHandle {
RequestErrorHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if !is_revert {
debug!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err);
debug!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn, method, params, err
);
}
}
RequestErrorHandler::ErrorLevel => {
error!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err);
error!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn, method, params, err
);
}
RequestErrorHandler::WarnLevel => {
warn!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err);
warn!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn, method, params, err
);
}
RequestErrorHandler::SaveReverts => {
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)

@ -10,11 +10,11 @@ use entities::{rpc_accounting, rpc_key};
use hashbrown::HashMap;
use http::StatusCode;
use log::warn;
use migration::{Condition, Expr, SimpleExpr};
use redis_rate_limiter::{redis::AsyncCommands, RedisConnection};
use sea_orm::{
use migration::sea_orm::{
ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select,
};
use migration::{Condition, Expr, SimpleExpr};
use redis_rate_limiter::{redis::AsyncCommands, RedisConnection};
/// get the attached address from redis for the given auth_token.
/// 0 means all users