add support for optional db replica

also add cleanup of expired login data
This commit is contained in:
Bryan Stitt 2022-12-16 00:48:24 -08:00
parent 0a60ccd95e
commit 1465ee355c
8 changed files with 157 additions and 48 deletions

@ -261,6 +261,8 @@ These are roughly in order of completition
These are not yet ordered. There might be duplicates. We might not actually need all of these.
- [x] cache user stats in redis and with headers
- [x] optional read-only database connection
- [ ] rate limiting/throttling on query_user_stats
- [ ] minimum allowed query_start on query_user_stats
- [ ] query_user_stats cache hit rate
@ -573,3 +575,4 @@ in another repo: event subscriber
- [ ] if it is too long, (the last 4 bytes must be zero), give an error so descriptions like this stand out
- [ ] we need to use docker-compose's proper environment variable handling. because now if someone tries to start dev containers in their prod, remove orphans stops and removes them
- [ ] change invite codes to set the user_tier
- [ ] some cli commands should use the replica if possible

@ -8,6 +8,9 @@ db_max_connections = 99
# production runs inside docker and so uses "mysql://root:web3_proxy@db:3306/web3_proxy" for db_url
db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy"
# read-only replica useful when running the proxy in multiple regions
db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy"
# thundering herd protection
# only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit
min_sum_soft_limit = 2_000

@ -93,6 +93,18 @@ pub struct AuthorizationChecks {
pub log_revert_chance: f64,
}
/// Simple wrapper so that we can keep track of read only connections.
/// This does no blocking of writing in the compiler!
#[derive(Clone)]
pub struct DatabaseReplica(pub DatabaseConnection);
// TODO: I feel like we could do something smart with DeRef or AsRef or Borrow, but that wasn't working for me
impl DatabaseReplica {
pub fn conn(&self) -> &DatabaseConnection {
&self.0
}
}
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
@ -108,6 +120,7 @@ pub struct Web3ProxyApp {
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub db_replica: Option<DatabaseReplica>,
/// prometheus metrics
app_metrics: Arc<Web3ProxyAppMetrics>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
@ -269,8 +282,11 @@ impl Web3ProxyApp {
let app_metrics = Default::default();
let open_request_handle_metrics: Arc<OpenRequestHandleMetrics> = Default::default();
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
// connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = top_config.app.db_url.clone() {
if let Some(db_url) = top_config.app.db_url.clone() {
let db_min_connections = top_config
.app
.db_min_connections
@ -282,12 +298,39 @@ impl Web3ProxyApp {
.db_max_connections
.unwrap_or(db_min_connections * 2);
let db_conn = get_migrated_db(db_url, db_min_connections, db_max_connections).await?;
db_conn = Some(get_migrated_db(db_url, db_min_connections, db_max_connections).await?);
Some(db_conn)
db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() {
let db_replica_min_connections = top_config
.app
.db_replica_min_connections
.unwrap_or(db_min_connections);
let db_replica_max_connections = top_config
.app
.db_replica_max_connections
.unwrap_or(db_max_connections);
let db_replica = get_db(
db_replica_url,
db_replica_min_connections,
db_replica_max_connections,
)
.await?;
Some(DatabaseReplica(db_replica))
} else {
// just clone so that we don't need a bunch of checks all over our code
db_conn.clone().map(DatabaseReplica)
};
} else {
if top_config.app.db_replica_url.is_some() {
return Err(anyhow::anyhow!(
"if there is a db_replica_url, there must be a db_url"
));
}
warn!("no database. some features will be disabled");
None
};
let balanced_rpcs = top_config.balanced_rpcs;
@ -570,6 +613,7 @@ impl Web3ProxyApp {
frontend_key_rate_limiter,
login_rate_limiter,
db_conn,
db_replica,
vredis_pool,
app_metrics,
open_request_handle_metrics,
@ -677,6 +721,10 @@ impl Web3ProxyApp {
self.db_conn.clone()
}
pub fn db_replica(&self) -> Option<DatabaseReplica> {
self.db_replica.clone()
}
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.vredis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),

@ -71,6 +71,17 @@ pub struct AppConfig {
/// If none, the minimum * 2 is used.
pub db_max_connections: Option<u32>,
/// Read-only replica of db_url.
pub db_replica_url: Option<String>,
/// minimum size of the connection pool for the database replica.
/// If none, db_min_connections is used.
pub db_replica_min_connections: Option<u32>,
/// maximum size of the connection pool for the database replica.
/// If none, db_max_connections is used.
pub db_replica_max_connections: Option<u32>,
/// Default request limit for registered users.
/// 0 = block all requests
/// None = allow all requests

@ -54,6 +54,7 @@ pub enum AuthorizationType {
#[derive(Clone, Debug)]
pub struct Authorization {
pub checks: AuthorizationChecks,
// TODO: instead of the conn, have a channel?
pub db_conn: Option<DatabaseConnection>,
pub ip: IpAddr,
pub origin: Option<Origin>,
@ -437,8 +438,8 @@ impl Web3ProxyApp {
let semaphore_permit = semaphore.acquire_owned().await?;
// get the attached address from the database for the given auth_token.
let db_conn = self
.db_conn()
let db_replica = self
.db_replica()
.context("checking if bearer token is authorized")?;
let user_bearer_uuid: Uuid = user_bearer_token.into();
@ -446,7 +447,7 @@ impl Web3ProxyApp {
let user = user::Entity::find()
.left_join(login::Entity)
.filter(login::Column::BearerToken.eq(user_bearer_uuid))
.one(&db_conn)
.one(db_replica.conn())
.await
.context("fetching user from db by bearer token")?
.context("unknown bearer token")?;
@ -570,9 +571,9 @@ impl Web3ProxyApp {
let authorization_checks: Result<_, Arc<anyhow::Error>> = self
.rpc_secret_key_cache
.try_get_with(rpc_secret_key.into(), async move {
// // trace!(?rpc_secret_key, "user cache miss");
// trace!(?rpc_secret_key, "user cache miss");
let db_conn = self.db_conn().context("Getting database connection")?;
let db_replica = self.db_replica().context("Getting database connection")?;
let rpc_secret_key: Uuid = rpc_secret_key.into();
@ -582,20 +583,20 @@ impl Web3ProxyApp {
match rpc_key::Entity::find()
.filter(rpc_key::Column::SecretKey.eq(rpc_secret_key))
.filter(rpc_key::Column::Active.eq(true))
.one(&db_conn)
.one(db_replica.conn())
.await?
{
Some(rpc_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(&db_conn)
.one(db_replica.conn())
.await?
.expect("related user");
let user_tier_model =
user_tier::Entity::find_by_id(user_model.user_tier_id)
.one(&db_conn)
.one(db_replica.conn())
.await?
.expect("related user tier");

@ -13,7 +13,6 @@ use std::sync::Arc;
/// Health check page for load balancers to use.
#[debug_handler]
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: also check that the head block is not too old
if app.balanced_rpcs.synced() {
@ -27,7 +26,6 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
///
/// TODO: when done debugging, remove this and only allow access on a different port
#[debug_handler]
pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.prometheus_metrics()
}
@ -36,7 +34,6 @@ pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl In
///
/// TODO: replace this with proper stats and monitoring
#[debug_handler]
pub async fn status(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(response_cache): Extension<FrontendResponseCache>,
@ -48,6 +45,7 @@ pub async fn status(
// TODO: what else should we include? uptime, cache hit rates, cpu load
let body = json!({
"chain_id": app.config.chain_id,
"pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(),
"user_cache_count": app.rpc_secret_key_cache.entry_count(),

@ -26,7 +26,7 @@ use hashbrown::HashMap;
use http::{HeaderValue, StatusCode};
use ipnet::IpNet;
use itertools::Itertools;
use log::warn;
use log::{debug, warn};
use migration::sea_orm::prelude::Uuid;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
@ -59,7 +59,6 @@ use ulid::Ulid;
/// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist.
/// We can prompt for an email and and payment after they log in.
#[debug_handler]
pub async fn user_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -216,14 +215,14 @@ pub async fn user_login_post(
let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?;
// fetch the message we gave them from our database
let db_conn = app.db_conn().context("Getting database connection")?;
let db_replica = app.db_replica().context("Getting database connection")?;
// massage type for the db
let login_nonce_uuid: Uuid = login_nonce.clone().into();
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(login_nonce_uuid))
.one(&db_conn)
.one(db_replica.conn())
.await
.context("database error while finding pending_login")?
.context("login nonce not found")?;
@ -247,6 +246,20 @@ pub async fn user_login_post(
.verify_eip191(&their_sig)
.context("verifying eip191 signature against our local message")
{
let db_conn = app
.db_conn()
.context("deleting expired pending logins requires a db")?;
// delete ALL expired rows.
let now = Utc::now();
let delete_result = pending_login::Entity::delete_many()
.filter(pending_login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.await?;
// TODO: emit a stat? if this is high something weird might be happening
debug!("cleared expired pending_logins: {:?}", delete_result);
return Err(anyhow::anyhow!(
"both the primary and eip191 verification failed: {:#?}; {:#?}",
err_1,
@ -259,10 +272,12 @@ pub async fn user_login_post(
// TODO: limit columns or load whole user?
let u = user::Entity::find()
.filter(user::Column::Address.eq(our_msg.address.as_ref()))
.one(&db_conn)
.one(db_replica.conn())
.await
.unwrap();
let db_conn = app.db_conn().context("login requires a db")?;
let (u, uks, status_code) = match u {
None => {
// user does not exist yet
@ -316,7 +331,7 @@ pub async fn user_login_post(
// the user is already registered
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(u.id))
.all(&db_conn)
.all(db_replica.conn())
.await
.context("failed loading user's key")?;
@ -385,9 +400,27 @@ pub async fn user_logout_post(
.exec(&db_conn)
.await
{
warn!("Failed to delete {}: {}", user_bearer.redis_key(), err);
debug!("Failed to delete {}: {}", user_bearer.redis_key(), err);
}
let now = Utc::now();
// also delete any expired logins
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.await;
debug!("Deleted expired logins: {:?}", delete_result);
// also delete any expired pending logins
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.await;
debug!("Deleted expired pending logins: {:?}", delete_result);
// TODO: what should the response be? probably json something
Ok("goodbye".into_response())
}
@ -398,7 +431,6 @@ pub async fn user_logout_post(
///
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
@ -416,7 +448,6 @@ pub struct UserPost {
/// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
@ -463,7 +494,6 @@ pub async fn user_post(
/// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_balance_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -494,18 +524,19 @@ pub async fn user_balance_post(
///
/// TODO: one key per request? maybe /user/keys/:rpc_key?
#[debug_handler]
pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_conn = app.db_conn().context("getting db to fetch user's keys")?;
let db_replica = app
.db_replica()
.context("getting db to fetch user's keys")?;
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.all(&db_conn)
.all(db_replica.conn())
.await
.context("failed loading user's key")?;
@ -523,7 +554,6 @@ pub async fn rpc_keys_get(
/// `DELETE /user/keys` -- Use a bearer token to delete an existing key.
#[debug_handler]
pub async fn rpc_keys_delete(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -564,14 +594,14 @@ pub async fn rpc_keys_management(
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_conn = app.db_conn().context("getting db for user's keys")?;
let db_replica = app.db_replica().context("getting db for user's keys")?;
let mut uk = if let Some(existing_key_id) = payload.key_id {
// get the key and make sure it belongs to the user
rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.filter(rpc_key::Column::Id.eq(existing_key_id))
.one(&db_conn)
.one(db_replica.conn())
.await
.context("failed loading user's key")?
.context("key does not exist or is not controlled by this bearer token")?
@ -712,6 +742,8 @@ pub async fn rpc_keys_management(
}
let uk = if uk.is_changed() {
let db_conn = app.db_conn().context("login requires a db")?;
uk.save(&db_conn).await.context("Failed saving user key")?
} else {
uk
@ -745,11 +777,13 @@ pub async fn user_revert_logs_get(
response.insert("chain_id", json!(chain_id));
response.insert("query_start", json!(query_start.timestamp() as u64));
let db_conn = app.db_conn().context("getting db for user's revert logs")?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.all(&db_conn)
.all(db_replica.conn())
.await
.context("failed loading user's key")?;
@ -772,7 +806,7 @@ pub async fn user_revert_logs_get(
// query the database for number of items and pages
let pages_result = q
.clone()
.paginate(&db_conn, page_size)
.paginate(db_replica.conn(), page_size)
.num_items_and_pages()
.await?;
@ -780,7 +814,10 @@ pub async fn user_revert_logs_get(
response.insert("num_pages", pages_result.number_of_pages.into());
// query the database for the revert logs
let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?;
let revert_logs = q
.paginate(db_replica.conn(), page_size)
.fetch_page(page)
.await?;
response.insert("revert_logs", json!(revert_logs));

@ -1,3 +1,4 @@
use crate::app::DatabaseReplica;
use crate::frontend::errors::FrontendErrorResponse;
use crate::{app::Web3ProxyApp, user_token::UserBearerToken};
use anyhow::Context;
@ -27,7 +28,8 @@ use serde_json::json;
/// This authenticates that the bearer is allowed to view this user_id's stats
pub async fn get_user_id_from_params(
redis_conn: &mut RedisConnection,
db_conn: DatabaseConnection,
db_conn: &DatabaseConnection,
db_replica: &DatabaseReplica,
// this is a long type. should we strip it down?
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &HashMap<String, String>,
@ -49,24 +51,23 @@ pub async fn get_user_id_from_params(
let user_login = login::Entity::find()
.filter(login::Column::BearerToken.eq(user_bearer_token.uuid()))
.one(&db_conn)
.one(db_replica.conn())
.await
.context("database error while querying for user")?
.ok_or(FrontendErrorResponse::AccessDenied)?;
// check expiration. if expired, delete ALL expired pending_logins
// if expired, delete ALL expired logins
let now = Utc::now();
if now > user_login.expires_at {
// this row is expired! do not allow auth!
// delete ALL expired rows.
// delete ALL expired logins.
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await?;
// TODO: emit a stat? if this is high something weird might be happening
debug!("cleared expired pending_logins: {:?}", delete_result);
debug!("cleared expired logins: {:?}", delete_result);
return Err(FrontendErrorResponse::AccessDenied);
}
@ -260,11 +261,18 @@ pub async fn query_user_stats<'a>(
params: &'a HashMap<String, String>,
stat_response_type: StatResponse,
) -> Result<Response, FrontendErrorResponse> {
let db_conn = app.db_conn().context("connecting to db")?;
let mut redis_conn = app.redis_conn().await.context("connecting to redis")?;
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_replica = app
.db_replica()
.context("query_user_stats needs a db replica")?;
let mut redis_conn = app
.redis_conn()
.await
.context("query_user_stats needs a redis")?;
// get the user id first. if it is 0, we should use a cache on the app
let user_id = get_user_id_from_params(&mut redis_conn, db_conn.clone(), bearer, params).await?;
let user_id =
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
// get the query window seconds now so that we can pick a cache with a good TTL
// TODO: for now though, just do one cache. its easier
let query_window_seconds = get_query_window_seconds_from_params(params)?;
@ -307,7 +315,7 @@ pub async fn query_user_stats<'a>(
.expect("max-age should always parse"),
);
info!("served resposne from cache");
// TODO: emit a stat
return Ok(response);
}
@ -435,7 +443,7 @@ pub async fn query_user_stats<'a>(
// query the database for number of items and pages
let pages_result = q
.clone()
.paginate(&db_conn, page_size)
.paginate(db_replica.conn(), page_size)
.num_items_and_pages()
.await?;
@ -445,7 +453,7 @@ pub async fn query_user_stats<'a>(
// query the database (todo: combine with the pages_result query?)
let query_response = q
.into_json()
.paginate(&db_conn, page_size)
.paginate(db_replica.conn(), page_size)
.fetch_page(page)
.await?;