From 1465ee355cbb108859f33a8fee7a9e9e368e0291 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 16 Dec 2022 00:48:24 -0800 Subject: [PATCH] add support for optional db replica also add cleanup of expired login data --- TODO.md | 3 + config/example.toml | 3 + web3_proxy/src/app/mod.rs | 56 +++++++++++++++-- web3_proxy/src/config.rs | 11 ++++ web3_proxy/src/frontend/authorization.rs | 17 +++--- web3_proxy/src/frontend/status.rs | 4 +- web3_proxy/src/frontend/users.rs | 77 ++++++++++++++++++------ web3_proxy/src/user_queries.rs | 34 +++++++---- 8 files changed, 157 insertions(+), 48 deletions(-) diff --git a/TODO.md b/TODO.md index 758db189..72ec44d6 100644 --- a/TODO.md +++ b/TODO.md @@ -261,6 +261,8 @@ These are roughly in order of completition These are not yet ordered. There might be duplicates. We might not actually need all of these. +- [x] cache user stats in redis and with headers +- [x] optional read-only database connection - [ ] rate limiting/throttling on query_user_stats - [ ] minimum allowed query_start on query_user_stats - [ ] query_user_stats cache hit rate @@ -573,3 +575,4 @@ in another repo: event subscriber - [ ] if it is too long, (the last 4 bytes must be zero), give an error so descriptions like this stand out - [ ] we need to use docker-compose's proper environment variable handling. because now if someone tries to start dev containers in their prod, remove orphans stops and removes them - [ ] change invite codes to set the user_tier +- [ ] some cli commands should use the replica if possible \ No newline at end of file diff --git a/config/example.toml b/config/example.toml index 9c396eca..729bd083 100644 --- a/config/example.toml +++ b/config/example.toml @@ -8,6 +8,9 @@ db_max_connections = 99 # production runs inside docker and so uses "mysql://root:web3_proxy@db:3306/web3_proxy" for db_url db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" +# read-only replica useful when running the proxy in multiple regions +db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" + # thundering herd protection # only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit min_sum_soft_limit = 2_000 diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 528d4376..7257a9e8 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -93,6 +93,18 @@ pub struct AuthorizationChecks { pub log_revert_chance: f64, } +/// Simple wrapper so that we can keep track of read only connections. +/// This does no blocking of writing in the compiler! +#[derive(Clone)] +pub struct DatabaseReplica(pub DatabaseConnection); + +// TODO: I feel like we could do something smart with DeRef or AsRef or Borrow, but that wasn't working for me +impl DatabaseReplica { + pub fn conn(&self) -> &DatabaseConnection { + &self.0 + } +} + /// The application // TODO: this debug impl is way too verbose. make something smaller // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard @@ -108,6 +120,7 @@ pub struct Web3ProxyApp { pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, + pub db_replica: Option, /// prometheus metrics app_metrics: Arc, open_request_handle_metrics: Arc, @@ -269,8 +282,11 @@ impl Web3ProxyApp { let app_metrics = Default::default(); let open_request_handle_metrics: Arc = Default::default(); + let mut db_conn = None::; + let mut db_replica = None::; + // connect to mysql and make sure the latest migrations have run - let db_conn = if let Some(db_url) = top_config.app.db_url.clone() { + if let Some(db_url) = top_config.app.db_url.clone() { let db_min_connections = top_config .app .db_min_connections @@ -282,12 +298,39 @@ impl Web3ProxyApp { .db_max_connections .unwrap_or(db_min_connections * 2); - let db_conn = get_migrated_db(db_url, db_min_connections, db_max_connections).await?; + db_conn = Some(get_migrated_db(db_url, db_min_connections, db_max_connections).await?); - Some(db_conn) + db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() { + let db_replica_min_connections = top_config + .app + .db_replica_min_connections + .unwrap_or(db_min_connections); + + let db_replica_max_connections = top_config + .app + .db_replica_max_connections + .unwrap_or(db_max_connections); + + let db_replica = get_db( + db_replica_url, + db_replica_min_connections, + db_replica_max_connections, + ) + .await?; + + Some(DatabaseReplica(db_replica)) + } else { + // just clone so that we don't need a bunch of checks all over our code + db_conn.clone().map(DatabaseReplica) + }; } else { + if top_config.app.db_replica_url.is_some() { + return Err(anyhow::anyhow!( + "if there is a db_replica_url, there must be a db_url" + )); + } + warn!("no database. some features will be disabled"); - None }; let balanced_rpcs = top_config.balanced_rpcs; @@ -570,6 +613,7 @@ impl Web3ProxyApp { frontend_key_rate_limiter, login_rate_limiter, db_conn, + db_replica, vredis_pool, app_metrics, open_request_handle_metrics, @@ -677,6 +721,10 @@ impl Web3ProxyApp { self.db_conn.clone() } + pub fn db_replica(&self) -> Option { + self.db_replica.clone() + } + pub async fn redis_conn(&self) -> anyhow::Result { match self.vredis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index c884082e..7a81bc20 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -71,6 +71,17 @@ pub struct AppConfig { /// If none, the minimum * 2 is used. pub db_max_connections: Option, + /// Read-only replica of db_url. + pub db_replica_url: Option, + + /// minimum size of the connection pool for the database replica. + /// If none, db_min_connections is used. + pub db_replica_min_connections: Option, + + /// maximum size of the connection pool for the database replica. + /// If none, db_max_connections is used. + pub db_replica_max_connections: Option, + /// Default request limit for registered users. /// 0 = block all requests /// None = allow all requests diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 24962a64..ddb9c987 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -54,6 +54,7 @@ pub enum AuthorizationType { #[derive(Clone, Debug)] pub struct Authorization { pub checks: AuthorizationChecks, + // TODO: instead of the conn, have a channel? pub db_conn: Option, pub ip: IpAddr, pub origin: Option, @@ -437,8 +438,8 @@ impl Web3ProxyApp { let semaphore_permit = semaphore.acquire_owned().await?; // get the attached address from the database for the given auth_token. - let db_conn = self - .db_conn() + let db_replica = self + .db_replica() .context("checking if bearer token is authorized")?; let user_bearer_uuid: Uuid = user_bearer_token.into(); @@ -446,7 +447,7 @@ impl Web3ProxyApp { let user = user::Entity::find() .left_join(login::Entity) .filter(login::Column::BearerToken.eq(user_bearer_uuid)) - .one(&db_conn) + .one(db_replica.conn()) .await .context("fetching user from db by bearer token")? .context("unknown bearer token")?; @@ -570,9 +571,9 @@ impl Web3ProxyApp { let authorization_checks: Result<_, Arc> = self .rpc_secret_key_cache .try_get_with(rpc_secret_key.into(), async move { - // // trace!(?rpc_secret_key, "user cache miss"); + // trace!(?rpc_secret_key, "user cache miss"); - let db_conn = self.db_conn().context("Getting database connection")?; + let db_replica = self.db_replica().context("Getting database connection")?; let rpc_secret_key: Uuid = rpc_secret_key.into(); @@ -582,20 +583,20 @@ impl Web3ProxyApp { match rpc_key::Entity::find() .filter(rpc_key::Column::SecretKey.eq(rpc_secret_key)) .filter(rpc_key::Column::Active.eq(true)) - .one(&db_conn) + .one(db_replica.conn()) .await? { Some(rpc_key_model) => { // TODO: move these splits into helper functions // TODO: can we have sea orm handle this for us? let user_model = user::Entity::find_by_id(rpc_key_model.user_id) - .one(&db_conn) + .one(db_replica.conn()) .await? .expect("related user"); let user_tier_model = user_tier::Entity::find_by_id(user_model.user_tier_id) - .one(&db_conn) + .one(db_replica.conn()) .await? .expect("related user tier"); diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index ceda6e95..f8853821 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -13,7 +13,6 @@ use std::sync::Arc; /// Health check page for load balancers to use. #[debug_handler] - pub async fn health(Extension(app): Extension>) -> impl IntoResponse { // TODO: also check that the head block is not too old if app.balanced_rpcs.synced() { @@ -27,7 +26,6 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// /// TODO: when done debugging, remove this and only allow access on a different port #[debug_handler] - pub async fn prometheus(Extension(app): Extension>) -> impl IntoResponse { app.prometheus_metrics() } @@ -36,7 +34,6 @@ pub async fn prometheus(Extension(app): Extension>) -> impl In /// /// TODO: replace this with proper stats and monitoring #[debug_handler] - pub async fn status( Extension(app): Extension>, Extension(response_cache): Extension, @@ -48,6 +45,7 @@ pub async fn status( // TODO: what else should we include? uptime, cache hit rates, cpu load let body = json!({ + "chain_id": app.config.chain_id, "pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_size": app.pending_transactions.weighted_size(), "user_cache_count": app.rpc_secret_key_cache.entry_count(), diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 9bfb8a45..e9f1e7dc 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -26,7 +26,7 @@ use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; -use log::warn; +use log::{debug, warn}; use migration::sea_orm::prelude::Uuid; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, @@ -59,7 +59,6 @@ use ulid::Ulid; /// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist. /// We can prompt for an email and and payment after they log in. #[debug_handler] - pub async fn user_login_get( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -216,14 +215,14 @@ pub async fn user_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_conn = app.db_conn().context("Getting database connection")?; + let db_replica = app.db_replica().context("Getting database connection")?; // massage type for the db let login_nonce_uuid: Uuid = login_nonce.clone().into(); let user_pending_login = pending_login::Entity::find() .filter(pending_login::Column::Nonce.eq(login_nonce_uuid)) - .one(&db_conn) + .one(db_replica.conn()) .await .context("database error while finding pending_login")? .context("login nonce not found")?; @@ -247,6 +246,20 @@ pub async fn user_login_post( .verify_eip191(&their_sig) .context("verifying eip191 signature against our local message") { + let db_conn = app + .db_conn() + .context("deleting expired pending logins requires a db")?; + + // delete ALL expired rows. + let now = Utc::now(); + let delete_result = pending_login::Entity::delete_many() + .filter(pending_login::Column::ExpiresAt.lte(now)) + .exec(&db_conn) + .await?; + + // TODO: emit a stat? if this is high something weird might be happening + debug!("cleared expired pending_logins: {:?}", delete_result); + return Err(anyhow::anyhow!( "both the primary and eip191 verification failed: {:#?}; {:#?}", err_1, @@ -259,10 +272,12 @@ pub async fn user_login_post( // TODO: limit columns or load whole user? let u = user::Entity::find() .filter(user::Column::Address.eq(our_msg.address.as_ref())) - .one(&db_conn) + .one(db_replica.conn()) .await .unwrap(); + let db_conn = app.db_conn().context("login requires a db")?; + let (u, uks, status_code) = match u { None => { // user does not exist yet @@ -316,7 +331,7 @@ pub async fn user_login_post( // the user is already registered let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(u.id)) - .all(&db_conn) + .all(db_replica.conn()) .await .context("failed loading user's key")?; @@ -385,9 +400,27 @@ pub async fn user_logout_post( .exec(&db_conn) .await { - warn!("Failed to delete {}: {}", user_bearer.redis_key(), err); + debug!("Failed to delete {}: {}", user_bearer.redis_key(), err); } + let now = Utc::now(); + + // also delete any expired logins + let delete_result = login::Entity::delete_many() + .filter(login::Column::ExpiresAt.lte(now)) + .exec(&db_conn) + .await; + + debug!("Deleted expired logins: {:?}", delete_result); + + // also delete any expired pending logins + let delete_result = login::Entity::delete_many() + .filter(login::Column::ExpiresAt.lte(now)) + .exec(&db_conn) + .await; + + debug!("Deleted expired pending logins: {:?}", delete_result); + // TODO: what should the response be? probably json something Ok("goodbye".into_response()) } @@ -398,7 +431,6 @@ pub async fn user_logout_post( /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] - pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, @@ -416,7 +448,6 @@ pub struct UserPost { /// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header. #[debug_handler] - pub async fn user_post( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, @@ -463,7 +494,6 @@ pub async fn user_post( /// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] - pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -494,18 +524,19 @@ pub async fn user_balance_post( /// /// TODO: one key per request? maybe /user/keys/:rpc_key? #[debug_handler] - pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_conn = app.db_conn().context("getting db to fetch user's keys")?; + let db_replica = app + .db_replica() + .context("getting db to fetch user's keys")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) - .all(&db_conn) + .all(db_replica.conn()) .await .context("failed loading user's key")?; @@ -523,7 +554,6 @@ pub async fn rpc_keys_get( /// `DELETE /user/keys` -- Use a bearer token to delete an existing key. #[debug_handler] - pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -564,14 +594,14 @@ pub async fn rpc_keys_management( let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_conn = app.db_conn().context("getting db for user's keys")?; + let db_replica = app.db_replica().context("getting db for user's keys")?; let mut uk = if let Some(existing_key_id) = payload.key_id { // get the key and make sure it belongs to the user rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .filter(rpc_key::Column::Id.eq(existing_key_id)) - .one(&db_conn) + .one(db_replica.conn()) .await .context("failed loading user's key")? .context("key does not exist or is not controlled by this bearer token")? @@ -712,6 +742,8 @@ pub async fn rpc_keys_management( } let uk = if uk.is_changed() { + let db_conn = app.db_conn().context("login requires a db")?; + uk.save(&db_conn).await.context("Failed saving user key")? } else { uk @@ -745,11 +777,13 @@ pub async fn user_revert_logs_get( response.insert("chain_id", json!(chain_id)); response.insert("query_start", json!(query_start.timestamp() as u64)); - let db_conn = app.db_conn().context("getting db for user's revert logs")?; + let db_replica = app + .db_replica() + .context("getting replica db for user's revert logs")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) - .all(&db_conn) + .all(db_replica.conn()) .await .context("failed loading user's key")?; @@ -772,7 +806,7 @@ pub async fn user_revert_logs_get( // query the database for number of items and pages let pages_result = q .clone() - .paginate(&db_conn, page_size) + .paginate(db_replica.conn(), page_size) .num_items_and_pages() .await?; @@ -780,7 +814,10 @@ pub async fn user_revert_logs_get( response.insert("num_pages", pages_result.number_of_pages.into()); // query the database for the revert logs - let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?; + let revert_logs = q + .paginate(db_replica.conn(), page_size) + .fetch_page(page) + .await?; response.insert("revert_logs", json!(revert_logs)); diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index ffffe5e3..c3e16177 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -1,3 +1,4 @@ +use crate::app::DatabaseReplica; use crate::frontend::errors::FrontendErrorResponse; use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use anyhow::Context; @@ -27,7 +28,8 @@ use serde_json::json; /// This authenticates that the bearer is allowed to view this user_id's stats pub async fn get_user_id_from_params( redis_conn: &mut RedisConnection, - db_conn: DatabaseConnection, + db_conn: &DatabaseConnection, + db_replica: &DatabaseReplica, // this is a long type. should we strip it down? bearer: Option>>, params: &HashMap, @@ -49,24 +51,23 @@ pub async fn get_user_id_from_params( let user_login = login::Entity::find() .filter(login::Column::BearerToken.eq(user_bearer_token.uuid())) - .one(&db_conn) + .one(db_replica.conn()) .await .context("database error while querying for user")? .ok_or(FrontendErrorResponse::AccessDenied)?; - // check expiration. if expired, delete ALL expired pending_logins + // if expired, delete ALL expired logins let now = Utc::now(); - if now > user_login.expires_at { // this row is expired! do not allow auth! - // delete ALL expired rows. + // delete ALL expired logins. let delete_result = login::Entity::delete_many() .filter(login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await?; // TODO: emit a stat? if this is high something weird might be happening - debug!("cleared expired pending_logins: {:?}", delete_result); + debug!("cleared expired logins: {:?}", delete_result); return Err(FrontendErrorResponse::AccessDenied); } @@ -260,11 +261,18 @@ pub async fn query_user_stats<'a>( params: &'a HashMap, stat_response_type: StatResponse, ) -> Result { - let db_conn = app.db_conn().context("connecting to db")?; - let mut redis_conn = app.redis_conn().await.context("connecting to redis")?; + let db_conn = app.db_conn().context("query_user_stats needs a db")?; + let db_replica = app + .db_replica() + .context("query_user_stats needs a db replica")?; + let mut redis_conn = app + .redis_conn() + .await + .context("query_user_stats needs a redis")?; // get the user id first. if it is 0, we should use a cache on the app - let user_id = get_user_id_from_params(&mut redis_conn, db_conn.clone(), bearer, params).await?; + let user_id = + get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; // get the query window seconds now so that we can pick a cache with a good TTL // TODO: for now though, just do one cache. its easier let query_window_seconds = get_query_window_seconds_from_params(params)?; @@ -307,7 +315,7 @@ pub async fn query_user_stats<'a>( .expect("max-age should always parse"), ); - info!("served resposne from cache"); + // TODO: emit a stat return Ok(response); } @@ -435,7 +443,7 @@ pub async fn query_user_stats<'a>( // query the database for number of items and pages let pages_result = q .clone() - .paginate(&db_conn, page_size) + .paginate(db_replica.conn(), page_size) .num_items_and_pages() .await?; @@ -445,7 +453,7 @@ pub async fn query_user_stats<'a>( // query the database (todo: combine with the pages_result query?) let query_response = q .into_json() - .paginate(&db_conn, page_size) + .paginate(db_replica.conn(), page_size) .fetch_page(page) .await?;