close the db connection on exit

This commit is contained in:
Bryan Stitt 2023-07-13 14:56:17 -07:00
parent a893a41c90
commit 5d5e65ed40
5 changed files with 48 additions and 16 deletions

@ -49,6 +49,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use std::time::Duration; use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
@ -160,11 +161,31 @@ pub struct Web3ProxyAppSpawn {
/// these are important and must be allowed to finish /// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>, pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// config changes are sent here /// config changes are sent here
pub new_top_config: watch::Sender<TopConfig>, pub new_top_config: Arc<watch::Sender<TopConfig>>,
/// watch this to know when the app is ready to serve requests /// watch this to know when the app is ready to serve requests
pub ranked_rpcs: watch::Receiver<Option<Arc<RankedRpcs>>>, pub ranked_rpcs: watch::Receiver<Option<Arc<RankedRpcs>>>,
} }
impl Drop for Web3ProxyApp {
fn drop(&mut self) {
if let Ok(db_conn) = self.db_conn().cloned() {
/*
From the sqlx docs:
We recommend calling .close().await to gracefully close the pool and its connections when you are done using it.
This will also wake any tasks that are waiting on an .acquire() call,
so for long-lived applications its a good idea to call .close() during shutdown.
*/
let rt = Runtime::new().unwrap();
if let Err(err) = rt.block_on(db_conn.close()) {
error!(?err, "Unable to close db!");
};
}
}
}
impl Web3ProxyApp { impl Web3ProxyApp {
/// The main entrypoint. /// The main entrypoint.
pub async fn spawn( pub async fn spawn(
@ -610,7 +631,7 @@ impl Web3ProxyApp {
app, app,
app_handles, app_handles,
background_handles: important_background_handles, background_handles: important_background_handles,
new_top_config: new_top_config_sender, new_top_config: Arc::new(new_top_config_sender),
ranked_rpcs: consensus_connections_watcher, ranked_rpcs: consensus_connections_watcher,
}) })
} }

@ -174,7 +174,7 @@ impl ProxydSubCommand {
// start the frontend port // start the frontend port
let frontend_handle = tokio::spawn(frontend::serve( let frontend_handle = tokio::spawn(frontend::serve(
spawned_app.app, spawned_app.app.clone(),
frontend_shutdown_receiver, frontend_shutdown_receiver,
frontend_shutdown_complete_sender, frontend_shutdown_complete_sender,
)); ));
@ -286,6 +286,17 @@ impl ProxydSubCommand {
} }
} }
if let Ok(db_conn) = spawned_app.app.db_conn().cloned() {
/*
From the sqlx docs:
We recommend calling .close().await to gracefully close the pool and its connections when you are done using it.
This will also wake any tasks that are waiting on an .acquire() call,
so for long-lived applications its a good idea to call .close() during shutdown.
*/
db_conn.close().await?;
}
if background_errors.is_zero() && !exited_with_err { if background_errors.is_zero() && !exited_with_err {
info!("finished"); info!("finished");
Ok(()) Ok(())

@ -66,9 +66,11 @@ pub async fn create_user_as_admin(
}; };
info!(?admin_status_changer); info!(?admin_status_changer);
let db_conn = db.conn().await;
info!("Changing the status of the admin_wallet to be an admin"); info!("Changing the status of the admin_wallet to be an admin");
// Pass on the database into it ... // Pass on the database into it ...
admin_status_changer.main(db.conn()).await.unwrap(); admin_status_changer.main(&db_conn).await.unwrap();
// Now log him in again, because he was just signed out // Now log him in again, because he was just signed out
// Login the admin again, because he was just signed out // Login the admin again, because he was just signed out

@ -3,14 +3,13 @@ use entities::{user, user_tier};
use ethers::prelude::{LocalWallet, Signer}; use ethers::prelude::{LocalWallet, Signer};
use ethers::types::Signature; use ethers::types::Signature;
use migration::sea_orm::{ use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter,
}; };
use tracing::info; use tracing::info;
use web3_proxy::errors::Web3ProxyResult; use web3_proxy::errors::Web3ProxyResult;
use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin}; use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin};
use super::mysql::TestMysql;
/// Helper function to create an "ordinary" user /// Helper function to create an "ordinary" user
#[allow(unused)] #[allow(unused)]
pub async fn create_user( pub async fn create_user(
@ -57,12 +56,10 @@ pub async fn create_user(
#[allow(unused)] #[allow(unused)]
pub async fn set_user_tier( pub async fn set_user_tier(
x: &TestApp, x: &TestApp,
db: &TestMysql, db_conn: &DatabaseConnection,
user: user::Model, user: user::Model,
tier_name: &str, tier_name: &str,
) -> Web3ProxyResult<user_tier::Model> { ) -> Web3ProxyResult<user_tier::Model> {
let db_conn = db.conn();
let ut = user_tier::Entity::find() let ut = user_tier::Entity::find()
.filter(user_tier::Column::Title.like(tier_name)) .filter(user_tier::Column::Title.like(tier_name))
.one(db_conn) .one(db_conn)

@ -10,10 +10,11 @@ use tokio::{
use tracing::{info, trace, warn}; use tracing::{info, trace, warn};
use web3_proxy::relational_db::get_migrated_db; use web3_proxy::relational_db::get_migrated_db;
use migration::sea_orm::prelude;
/// on drop, the mysql docker container will be shut down /// on drop, the mysql docker container will be shut down
pub struct TestMysql { pub struct TestMysql {
pub url: Option<String>, pub url: Option<String>,
pub conn: Option<DatabaseConnection>,
pub container_name: String, pub container_name: String,
} }
@ -40,7 +41,6 @@ impl TestMysql {
// create the db_data as soon as the url is known // create the db_data as soon as the url is known
// when this is dropped, the db will be stopped // when this is dropped, the db will be stopped
let mut test_mysql = Self { let mut test_mysql = Self {
conn: None,
container_name: db_container_name.clone(), container_name: db_container_name.clone(),
url: None, url: None,
}; };
@ -150,9 +150,8 @@ impl TestMysql {
} }
match get_migrated_db(db_url.clone(), 1, 1).await { match get_migrated_db(db_url.clone(), 1, 1).await {
Ok(x) => { Ok(_) => {
// it worked! yey! // it worked! yey!
test_mysql.conn = Some(x);
break; break;
} }
Err(err) => { Err(err) => {
@ -168,8 +167,10 @@ impl TestMysql {
test_mysql test_mysql
} }
pub fn conn(&self) -> &DatabaseConnection { pub async fn conn(&self) -> DatabaseConnection {
self.conn.as_ref().unwrap() get_migrated_db(self.url.clone().unwrap(), 1, 5)
.await
.unwrap()
} }
} }