diff --git a/web3_proxy/src/admin_queries.rs b/web3_proxy/src/admin_queries.rs index 928eff16..a16dfbc2 100644 --- a/web3_proxy/src/admin_queries.rs +++ b/web3_proxy/src/admin_queries.rs @@ -1,5 +1,6 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; +use crate::globals::{global_db_conn, global_db_replica_conn}; use crate::http_params::get_user_id_from_params; use axum::response::IntoResponse; use axum::{ @@ -42,8 +43,8 @@ pub async fn query_admin_modify_usertier<'a>( let mut response_body = HashMap::new(); // Establish connections - let db_conn = app.db_conn()?; - let db_replica = app.db_replica()?; + let db_conn = global_db_conn().await?; + let db_replica = global_db_replica_conn().await?; let mut redis_conn = app.redis_conn().await?; // Will modify logic here @@ -52,14 +53,14 @@ pub async fn query_admin_modify_usertier<'a>( // TODO: Make a single query, where you retrieve the user, and directly from it the secondary user (otherwise we do two jumpy, which is unnecessary) // get the user id first. if it is 0, we should use a cache on the app let caller_id = - get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?; + get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; trace!(%caller_id, "query_admin_modify_usertier"); // Check if the caller is an admin (i.e. if he is in an admin table) let _admin = admin::Entity::find() .filter(admin::Column::UserId.eq(caller_id)) - .one(db_conn) + .one(&db_conn) .await? .ok_or(Web3ProxyError::AccessDenied("not an admin".into()))?; @@ -68,7 +69,7 @@ pub async fn query_admin_modify_usertier<'a>( // Fetch the admin, and the user let user: user::Model = user::Entity::find() .filter(user::Column::Address.eq(user_address.as_bytes())) - .one(db_conn) + .one(&db_conn) .await? .ok_or(Web3ProxyError::BadRequest( "No user with this id found".into(), @@ -82,7 +83,7 @@ pub async fn query_admin_modify_usertier<'a>( // Now we can modify the user's tier let new_user_tier: user_tier::Model = user_tier::Entity::find() .filter(user_tier::Column::Title.eq(user_tier_title.clone())) - .one(db_conn) + .one(&db_conn) .await? .ok_or(Web3ProxyError::BadRequest( "User Tier name was not found".into(), @@ -95,7 +96,7 @@ pub async fn query_admin_modify_usertier<'a>( user.user_tier_id = sea_orm::Set(new_user_tier.id); - user.save(db_conn).await?; + user.save(&db_conn).await?; info!("user's tier changed"); } @@ -103,7 +104,7 @@ pub async fn query_admin_modify_usertier<'a>( // Now delete all bearer tokens of this user login::Entity::delete_many() .filter(login::Column::UserId.eq(user.id)) - .exec(db_conn) + .exec(&db_conn) .await?; Ok(Json(&response_body).into_response()) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 03c058a0..3dfad6de 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -8,11 +8,12 @@ use crate::frontend::authorization::{ Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, }; use crate::frontend::rpc_proxy_ws::ProxyMode; +use crate::globals::{global_db_conn, DatabaseError, DB_CONN, DB_REPLICA}; use crate::jsonrpc::{ JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcId, JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, JsonRpcResultData, }; -use crate::relational_db::{connect_db, get_migrated_db, DatabaseConnection, DatabaseReplica}; +use crate::relational_db::{connect_db, migrate_db}; use crate::response_cache::{ JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher, }; @@ -34,7 +35,7 @@ use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; -use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; +use migration::sea_orm::{EntityTrait, PaginatorTrait}; use moka::future::{Cache, CacheBuilder}; use once_cell::sync::OnceCell; use redis_rate_limiter::redis::AsyncCommands; @@ -49,10 +50,10 @@ use std::str::FromStr; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::runtime::Runtime; +use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::JoinHandle; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::{error, info, trace, warn, Level}; // TODO: make this customizable? @@ -87,10 +88,6 @@ pub struct Web3ProxyApp { /// don't drop this or the sender will stop working /// TODO: broadcast channel instead? pub watch_consensus_head_receiver: watch::Receiver>, - /// Optional database for users and accounting - pub db_conn: Option, - /// Optional read-only database for users and accounting - pub db_replica: Option, pub hostname: Option, pub frontend_port: Arc, /// rate limit anonymous users @@ -178,8 +175,13 @@ impl Web3ProxyApp { flush_stat_buffer_receiver: mpsc::Receiver>, ) -> anyhow::Result { let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); + let mut config_watcher_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); + let (new_top_config_sender, mut new_top_config_receiver) = + watch::channel(top_config.clone()); + new_top_config_receiver.borrow_and_update(); + // safety checks on the config // while i would prefer this to be in a "apply_top_config" function, that is a larger refactor // TODO: maybe don't spawn with a config at all. have all config updates come through an apply_top_config call @@ -212,62 +214,6 @@ impl Web3ProxyApp { let important_background_handles: FuturesUnordered> = FuturesUnordered::new(); - // connect to the database and make sure the latest migrations have run - let mut db_conn = None::; - let mut db_replica = None::; - if let Some(db_url) = top_config.app.db_url.clone() { - let db_min_connections = top_config - .app - .db_min_connections - .unwrap_or(num_workers as u32); - - // TODO: what default multiple? - let db_max_connections = top_config - .app - .db_max_connections - .unwrap_or(db_min_connections * 2); - - db_conn = Some( - get_migrated_db(db_url.clone(), db_min_connections, db_max_connections).await?, - ); - - db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() { - if db_replica_url == db_url { - // url is the same. do not make a new connection or we might go past our max connections - db_conn.clone().map(Into::into) - } else { - let db_replica_min_connections = top_config - .app - .db_replica_min_connections - .unwrap_or(db_min_connections); - - let db_replica_max_connections = top_config - .app - .db_replica_max_connections - .unwrap_or(db_max_connections); - - let db_replica = connect_db( - db_replica_url, - db_replica_min_connections, - db_replica_max_connections, - ) - .await?; - - Some(db_replica.into()) - } - } else { - // just clone so that we don't need a bunch of checks all over our code - db_conn.clone().map(Into::into) - }; - } else { - anyhow::ensure!( - top_config.app.db_replica_url.is_none(), - "if there is a db_replica_url, there must be a db_url" - ); - - warn!("no database. some features will be disabled"); - }; - // connect to kafka for logging requests from the /debug/ urls let mut kafka_producer: Option = None; @@ -379,7 +325,6 @@ impl Web3ProxyApp { let stat_sender = if let Some(spawned_stat_buffer) = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, top_config.app.chain_id, - db_conn.clone(), 60, top_config.app.influxdb_bucket.clone(), influxdb_client.clone(), @@ -472,6 +417,7 @@ impl Web3ProxyApp { let chain_id = top_config.app.chain_id; + // TODO: remove this. it should only be done by apply_top_config let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( chain_id, top_config.app.max_head_block_lag, @@ -487,6 +433,7 @@ impl Web3ProxyApp { // prepare a Web3Rpcs to hold all our private connections // only some chains have this, so this is optional + // TODO: remove this. it should only be done by apply_top_config let private_rpcs = if top_config.private_rpcs.is_none() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); None @@ -515,6 +462,7 @@ impl Web3ProxyApp { // prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections // only some chains have this, so this is optional + // TODO: remove this. it should only be done by apply_top_config let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() { warn!("No bundler_4337_rpcs configured"); None @@ -545,8 +493,6 @@ impl Web3ProxyApp { balanced_rpcs, bundler_4337_rpcs, config: top_config.app.clone(), - db_conn, - db_replica, frontend_port: frontend_port.clone(), frontend_ip_rate_limiter, frontend_registered_user_rate_limiter, @@ -568,31 +514,47 @@ impl Web3ProxyApp { watch_consensus_head_receiver, }; + // TODO: do apply_top_config once we don't duplicate the db + if let Err(err) = app.apply_top_config_db(&top_config).await { + warn!(?err, "unable to fully apply config while starting!"); + }; + let app = Arc::new(app); // watch for config changes - // TODO: initial config reload should be from this channel. not from the call to spawn - - let (new_top_config_sender, mut new_top_config_receiver) = watch::channel(top_config); - + // TODO: move this to its own function/struct { let app = app.clone(); let config_handle = tokio::spawn(async move { loop { let new_top_config = new_top_config_receiver.borrow_and_update().to_owned(); - if let Err(err) = app.apply_top_config(new_top_config).await { - error!("unable to apply config! {:?}", err); - }; + // TODO: compare new and old here? the sender should be doing that already but maybe its better here - new_top_config_receiver - .changed() - .await - .web3_context("failed awaiting top_config change")?; + if let Err(err) = app.apply_top_config_rpcs(&new_top_config).await { + error!(?err, "unable to apply config! Retrying in 10 seconds (or if the config changes)"); + + select! { + _ = config_watcher_shutdown_receiver.recv() => { + break; + } + _ = sleep(Duration::from_secs(10)) => {} + _ = new_top_config_receiver.changed() => {} + } + } else { + select! { + _ = config_watcher_shutdown_receiver.recv() => { + break; + } + _ = new_top_config_receiver.changed() => {} + } + } } + + Ok(()) }); - app_handles.push(config_handle); + important_background_handles.push(config_handle); } if important_background_handles.is_empty() { @@ -616,41 +578,167 @@ impl Web3ProxyApp { }) } - pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> { - // TODO: also update self.config from new_top_config.app + pub async fn apply_top_config(&self, new_top_config: &TopConfig) -> Web3ProxyResult<()> { + // TODO: update self.config from new_top_config.app (or move it entirely to a global) + + // connect to the db first + let db = self.apply_top_config_db(new_top_config).await; + + // then refresh rpcs + let rpcs = self.apply_top_config_rpcs(new_top_config).await; + + // error if either failed + // TODO: if both error, log both errors + db?; + rpcs?; + + Ok(()) + } + + async fn apply_top_config_rpcs(&self, new_top_config: &TopConfig) -> Web3ProxyResult<()> { info!("applying new config"); - // connect to the backends - self.balanced_rpcs - .apply_server_configs(self, new_top_config.balanced_rpcs) + let balanced = self + .balanced_rpcs + .apply_server_configs(self, new_top_config.balanced_rpcs.clone()) .await - .web3_context("updating balanced rpcs")?; + .web3_context("updating balanced rpcs"); - if let Some(private_rpc_configs) = new_top_config.private_rpcs { + let private = if let Some(private_rpc_configs) = new_top_config.private_rpcs.clone() { if let Some(ref private_rpcs) = self.private_rpcs { private_rpcs .apply_server_configs(self, private_rpc_configs) .await - .web3_context("updating private_rpcs")?; + .web3_context("updating private_rpcs") } else { // TODO: maybe we should have private_rpcs just be empty instead of being None todo!("handle toggling private_rpcs") } - } + } else { + Ok(()) + }; - if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs { - if let Some(ref bundler_4337_rpcs) = self.bundler_4337_rpcs { - bundler_4337_rpcs - .apply_server_configs(self, bundler_4337_rpc_configs) - .await - .web3_context("updating bundler_4337_rpcs")?; + let bundler_4337 = + if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs.clone() { + if let Some(ref bundler_4337_rpcs) = self.bundler_4337_rpcs { + bundler_4337_rpcs + .apply_server_configs(self, bundler_4337_rpc_configs.clone()) + .await + .web3_context("updating bundler_4337_rpcs") + } else { + // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None + todo!("handle toggling bundler_4337_rpcs") + } } else { - // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None - todo!("handle toggling bundler_4337_rpcs") - } - } + Ok(()) + }; - info!("config applied successfully"); + // TODO: log all the errors if there are multiple + balanced?; + private?; + bundler_4337?; + + Ok(()) + } + + pub async fn apply_top_config_db(&self, new_top_config: &TopConfig) -> Web3ProxyResult<()> { + // TODO: get the actual value + let num_workers = 2; + + // connect to the db + // THIS DOES NOT RUN MIGRATIONS! + if let Some(db_url) = new_top_config.app.db_url.clone() { + let db_min_connections = new_top_config + .app + .db_min_connections + .unwrap_or(num_workers as u32); + + // TODO: what default multiple? + let db_max_connections = new_top_config + .app + .db_max_connections + .unwrap_or(db_min_connections * 2); + + let db_conn = if let Ok(old_db_conn) = global_db_conn().await { + // TODO: compare old settings with new settings. don't always re-use! + Ok(old_db_conn) + } else { + connect_db(db_url.clone(), db_min_connections, db_max_connections) + .await + .map_err(|err| DatabaseError::Connect(Arc::new(err))) + }; + + let db_replica = if let Ok(db_conn) = db_conn.as_ref() { + let db_replica = + if let Some(db_replica_url) = new_top_config.app.db_replica_url.clone() { + if db_replica_url == db_url { + // url is the same. do not make a new connection or we might go past our max connections + Ok(db_conn.clone().into()) + } else { + let db_replica_min_connections = new_top_config + .app + .db_replica_min_connections + .unwrap_or(db_min_connections); + + let db_replica_max_connections = new_top_config + .app + .db_replica_max_connections + .unwrap_or(db_max_connections); + + let db_replica = if let Ok(old_db_replica) = global_db_conn().await { + // TODO: compare old settings with new settings. don't always re-use! + Ok(old_db_replica) + } else { + connect_db( + db_replica_url, + db_replica_min_connections, + db_replica_max_connections, + ) + .await + .map_err(|err| DatabaseError::Connect(Arc::new(err))) + }; + + // if db_replica is error, but db_conn is success. log error and just use db_conn + + if let Err(err) = db_replica.as_ref() { + error!(?err, "db replica is down! using primary"); + + Ok(db_conn.clone().into()) + } else { + db_replica.map(Into::into) + } + } + } else { + // just clone so that we don't need a bunch of checks all over our code + trace!("no db replica configured"); + Ok(db_conn.clone().into()) + }; + + // db and replica are connected. try to migrate + if let Err(err) = migrate_db(db_conn, false).await { + error!(?err, "unable to migrate!"); + } + + db_replica + } else { + db_conn.clone().map(Into::into) + }; + + let mut locked_conn = DB_CONN.write().await; + let mut locked_replica = DB_REPLICA.write().await; + + *locked_conn = db_conn.clone(); + *locked_replica = db_replica.clone(); + + db_conn?; + db_replica?; + + info!("set global db connections"); + } else if new_top_config.app.db_replica_url.is_some() { + return Err(anyhow::anyhow!("db_replica_url set, but no db_url set!").into()); + } else { + warn!("no database. some features will be disabled"); + }; Ok(()) } @@ -662,7 +750,7 @@ impl Web3ProxyApp { pub fn influxdb_client(&self) -> Web3ProxyResult<&influxdb2::Client> { self.influxdb_client .as_ref() - .ok_or(Web3ProxyError::NoDatabase) + .ok_or(Web3ProxyError::NoDatabaseConfigured) } /// an ethers provider that you can use with ether's abigen. @@ -702,8 +790,8 @@ impl Web3ProxyApp { #[derive(Default, Serialize)] struct UserCount(i64); - let user_count: UserCount = if let Ok(db) = self.db_conn() { - match user::Entity::find().count(db).await { + let user_count: UserCount = if let Ok(db) = global_db_conn().await { + match user::Entity::find().count(&db).await { Ok(user_count) => UserCount(user_count as i64), Err(err) => { warn!(?err, "unable to count users"); @@ -864,9 +952,7 @@ impl Web3ProxyApp { method: &str, params: P, ) -> Web3ProxyResult { - let db_conn = self.db_conn().ok().cloned(); - - let authorization = Arc::new(Authorization::internal(db_conn)?); + let authorization = Arc::new(Authorization::internal()?); self.authorized_request(method, params, authorization).await } @@ -988,29 +1074,9 @@ impl Web3ProxyApp { Ok((collected, collected_rpcs)) } - #[inline] - pub fn db_conn(&self) -> Web3ProxyResult<&DatabaseConnection> { - self.db_conn.as_ref().ok_or(Web3ProxyError::NoDatabase) - } - - #[inline] - pub async fn db_transaction(&self) -> Web3ProxyResult { - if let Some(ref db_conn) = self.db_conn { - let x = db_conn.begin().await?; - Ok(x) - } else { - Err(Web3ProxyError::NoDatabase) - } - } - - #[inline] - pub fn db_replica(&self) -> Web3ProxyResult<&DatabaseReplica> { - self.db_replica.as_ref().ok_or(Web3ProxyError::NoDatabase) - } - pub async fn redis_conn(&self) -> Web3ProxyResult { match self.vredis_pool.as_ref() { - None => Err(Web3ProxyError::NoDatabase), + None => Err(Web3ProxyError::NoDatabaseConfigured), Some(redis_pool) => { // TODO: add a From for this let redis_conn = redis_pool.get().await.context("redis pool error")?; @@ -1452,7 +1518,7 @@ impl Web3ProxyApp { .zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now) .await?; } - Err(Web3ProxyError::NoDatabase) => {}, + Err(Web3ProxyError::NoDatabaseConfigured) => {}, Err(err) => { warn!( ?err, diff --git a/web3_proxy/src/balance.rs b/web3_proxy/src/balance.rs index 4db43ea5..5b2a5169 100644 --- a/web3_proxy/src/balance.rs +++ b/web3_proxy/src/balance.rs @@ -10,7 +10,7 @@ use migration::{Func, SimpleExpr}; use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use serde_json::json; -use tracing::info; +use tracing::trace; /// Implements the balance getter which combines data from several tables #[derive(Clone, Debug, Default, Deserialize)] @@ -220,8 +220,7 @@ impl Balance { user_id, }; - // TODO: lower log level - info!("balance: {:#}", json!(&balance)); + trace!("balance: {:#}", json!(&balance)); // Return None if there is no entry Ok(Some(balance)) diff --git a/web3_proxy/src/caches.rs b/web3_proxy/src/caches.rs index ab5c7695..9624929b 100644 --- a/web3_proxy/src/caches.rs +++ b/web3_proxy/src/caches.rs @@ -1,5 +1,5 @@ use crate::balance::Balance; -use crate::errors::Web3ProxyResult; +use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey}; use derive_more::From; use entities::rpc_key; @@ -50,7 +50,7 @@ impl UserBalanceCache { }; trace!(?x, "from database"); - Ok(Arc::new(AsyncRwLock::new(x))) + Ok::<_, Web3ProxyError>(Arc::new(AsyncRwLock::new(x))) }) .await?; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 7d51b8cb..5fe5a79c 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -6,7 +6,6 @@ use ethers::prelude::{Address, TxHash}; use ethers::types::{U256, U64}; use hashbrown::HashMap; use migration::sea_orm::prelude::Decimal; -use migration::sea_orm::DatabaseConnection; use sentry::types::Dsn; use serde::Deserialize; use serde_inline_default::serde_inline_default; @@ -276,7 +275,6 @@ impl Web3RpcConfig { pub async fn spawn( self, name: String, - db_conn: Option, redis_pool: Option, chain_id: u64, block_interval: Duration, @@ -293,7 +291,6 @@ impl Web3RpcConfig { self, name, chain_id, - db_conn, http_client, redis_pool, block_interval, diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index e359f7e8..bc577d65 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -40,7 +40,6 @@ impl From for Web3ProxyResult<()> { } } -// TODO: replace all String with `Cow<'static, str>` #[derive(Debug, Display, Error, From)] pub enum Web3ProxyError { Abi(ethers::abi::Error), @@ -59,6 +58,7 @@ pub enum Web3ProxyError { BadRouting, Contract(ContractError), Database(DbErr), + DatabaseArc(Arc), Decimal(DecimalError), EthersHttpClient(ethers::prelude::HttpClientError), EthersProvider(ethers::prelude::ProviderError), @@ -100,7 +100,7 @@ pub enum Web3ProxyError { NoBlockNumberOrHash, NoBlocksKnown, NoConsensusHeadBlock, - NoDatabase, + NoDatabaseConfigured, NoHandleReady, NoServersSynced, #[display(fmt = "{}/{}", num_known, min_head_rpcs)] @@ -271,6 +271,17 @@ impl Web3ProxyError { }, ) } + Self::DatabaseArc(err) => { + error!(?err, "database (arc) err: {}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: "database (arc) error!".into(), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::Decimal(err) => { debug!(?err, "Decimal Error: {}", err); ( @@ -614,13 +625,13 @@ impl Web3ProxyError { }, ) } - Self::NoDatabase => { + Self::NoDatabaseConfigured => { // TODO: this needs more context - error!("no database configured"); + debug!("no database configured"); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: "no database configured!".into(), + message: "no database configured! this request needs a database".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -1088,6 +1099,26 @@ impl Web3ProxyError { (status_code, Json(response)).into_response() } + + /// some things should keep going even if the db is down + pub fn split_db_errors(&self) -> Result<&Self, &Self> { + match self { + Web3ProxyError::NoDatabaseConfigured => Ok(self), + Web3ProxyError::Database(err) => { + warn!(?err, "db error while checking rpc key authorization"); + Ok(self) + } + Web3ProxyError::DatabaseArc(err) => { + warn!(?err, "db arc error while checking rpc key authorization"); + Ok(self) + } + Web3ProxyError::Arc(x) => { + // errors from inside moka cache helpers are wrapped in an Arc + x.split_db_errors() + } + _ => Err(self), + } + } } impl From for Web3ProxyError { diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index a8064858..d270809a 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -6,6 +6,7 @@ use crate::app::Web3ProxyApp; use crate::errors::Web3ProxyResponse; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext}; use crate::frontend::users::authentication::PostLogin; +use crate::globals::{global_db_conn, global_db_replica_conn}; use crate::premium::{get_user_and_tier_from_address, grant_premium_tier}; use crate::user_token::UserBearerToken; use axum::{ @@ -26,6 +27,7 @@ use http::StatusCode; use migration::sea_orm::prelude::{Decimal, Uuid}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, + TransactionTrait, }; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -57,7 +59,8 @@ pub async fn admin_increase_balance( let caller = app.bearer_is_authorized(bearer).await?; // Establish connections - let txn = app.db_transaction().await?; + let db_conn = global_db_conn().await?; + let txn = db_conn.begin().await?; // Check if the caller is an admin (if not, return early) let admin_entry: admin::Model = admin::Entity::find() @@ -90,7 +93,7 @@ pub async fn admin_increase_balance( // Invalidate the user_balance_cache for this user: if let Err(err) = app .user_balance_cache - .invalidate(&user_entry.id, app.db_conn()?, &app.rpc_secret_key_cache) + .invalidate(&user_entry.id, &db_conn, &app.rpc_secret_key_cache) .await { warn!(?err, "unable to invalidate caches"); @@ -194,8 +197,8 @@ pub async fn admin_imitate_login_get( resources: vec![], }; - let db_conn = app.db_conn()?; - let db_replica = app.db_replica()?; + let db_conn = global_db_conn().await?; + let db_replica = global_db_replica_conn().await?; let admin = user::Entity::find() .filter(user::Column::Address.eq(admin_address.as_bytes())) @@ -219,7 +222,7 @@ pub async fn admin_imitate_login_get( let now = Utc::now(); let delete_result = pending_login::Entity::delete_many() .filter(pending_login::Column::ExpiresAt.lte(now)) - .exec(db_conn) + .exec(&db_conn) .await?; trace!("cleared expired pending_logins: {:?}", delete_result); @@ -233,7 +236,7 @@ pub async fn admin_imitate_login_get( }; trail - .save(db_conn) + .save(&db_conn) .await .web3_context("saving user's pending_login")?; @@ -256,7 +259,7 @@ pub async fn admin_imitate_login_get( }; user_pending_login - .save(db_conn) + .save(&db_conn) .await .web3_context("saving an admin trail pre login")?; @@ -333,7 +336,7 @@ pub async fn admin_imitate_login_post( })?; // fetch the message we gave them from our database - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; let user_pending_login = pending_login::Entity::find() .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce))) @@ -376,7 +379,7 @@ pub async fn admin_imitate_login_post( .await? .web3_context("admin address was not found!")?; - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; // Add a message that the admin has logged in // Note that the admin is trying to log in as this user @@ -388,7 +391,7 @@ pub async fn admin_imitate_login_post( ..Default::default() }; trail - .save(db_conn) + .save(&db_conn) .await .web3_context("saving an admin trail post login")?; @@ -437,11 +440,15 @@ pub async fn admin_imitate_login_post( }; user_login - .save(db_conn) + .save(&db_conn) .await .web3_context("saving user login")?; - if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await { + if let Err(err) = user_pending_login + .into_active_model() + .delete(&db_conn) + .await + { warn!(none=?login_nonce.0, ?err, "Failed to delete nonce"); } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 64667144..8300d96b 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -5,6 +5,7 @@ use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use crate::balance::Balance; use crate::caches::RegisteredUserRateLimitKey; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::globals::global_db_replica_conn; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::one::Web3Rpc; @@ -26,8 +27,7 @@ use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; use migration::sea_orm::prelude::Decimal; -use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; -use parking_lot::Mutex; +use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; @@ -47,7 +47,7 @@ use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; -use tracing::{error, info, trace, warn}; +use tracing::{error, trace, warn}; use ulid::Ulid; use uuid::Uuid; @@ -173,7 +173,6 @@ pub struct AuthorizationChecks { #[derive(Clone, Debug)] pub struct Authorization { pub checks: AuthorizationChecks, - pub db_conn: Option, pub ip: IpAddr, pub origin: Option, pub referer: Option, @@ -390,7 +389,7 @@ pub struct RequestMetadata { impl Default for Authorization { fn default() -> Self { - Authorization::internal(None).unwrap() + Authorization::internal().unwrap() } } @@ -670,7 +669,7 @@ impl From for Uuid { } impl Authorization { - pub fn internal(db_conn: Option) -> Web3ProxyResult { + pub fn internal() -> Web3ProxyResult { let authorization_checks = AuthorizationChecks { // any error logs on a local (internal) query are likely problems. log them all log_revert_chance: 100, @@ -683,7 +682,6 @@ impl Authorization { Self::try_new( authorization_checks, - db_conn, &ip, None, None, @@ -694,7 +692,6 @@ impl Authorization { pub fn external( allowed_origin_requests_per_period: &HashMap, - db_conn: Option, ip: &IpAddr, origin: Option<&Origin>, proxy_mode: ProxyMode, @@ -719,7 +716,6 @@ impl Authorization { Self::try_new( authorization_checks, - db_conn, ip, origin, referer, @@ -731,7 +727,6 @@ impl Authorization { #[allow(clippy::too_many_arguments)] pub fn try_new( authorization_checks: AuthorizationChecks, - db_conn: Option, ip: &IpAddr, origin: Option<&Origin>, referer: Option<&Referer>, @@ -786,7 +781,6 @@ impl Authorization { Ok(Self { checks: authorization_checks, - db_conn, ip: *ip, origin: origin.cloned(), referer: referer.cloned(), @@ -994,7 +988,7 @@ impl Web3ProxyApp { let user_bearer_token = UserBearerToken::try_from(bearer)?; // get the attached address from the database for the given auth_token. - let db_replica = self.db_replica()?; + let db_replica = global_db_replica_conn().await?; let user_bearer_uuid: Uuid = user_bearer_token.into(); @@ -1019,7 +1013,6 @@ impl Web3ProxyApp { // we don't care about user agent or origin or referer let authorization = Authorization::external( &self.config.allowed_origin_requests_per_period, - self.db_conn().ok().cloned(), &ip, None, proxy_mode, @@ -1074,7 +1067,7 @@ impl Web3ProxyApp { ) -> Web3ProxyResult { if ip.is_loopback() { // TODO: localhost being unlimited should be optional - let authorization = Authorization::internal(self.db_conn().ok().cloned())?; + let authorization = Authorization::internal()?; return Ok(RateLimitResult::Allowed(authorization, None)); } @@ -1085,7 +1078,6 @@ impl Web3ProxyApp { // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( allowed_origin_requests_per_period, - self.db_conn().ok().cloned(), ip, origin, proxy_mode, @@ -1145,7 +1137,7 @@ impl Web3ProxyApp { let x = self .rpc_secret_key_cache .try_get_with_by_ref(rpc_secret_key, async move { - let db_replica = self.db_replica()?; + let db_replica = global_db_replica_conn().await?; // TODO: join the user table to this to return the User? we don't always need it // TODO: join on secondary users @@ -1270,7 +1262,7 @@ impl Web3ProxyApp { let rpc_key_id = Some(rpc_key_model.id.try_into().context("db ids are never 0")?); - Ok(AuthorizationChecks { + Ok::<_, Web3ProxyError>(AuthorizationChecks { allowed_ips, allowed_origins, allowed_referers, @@ -1307,10 +1299,25 @@ impl Web3ProxyApp { rpc_key: &RpcSecretKey, user_agent: Option<&UserAgent>, ) -> Web3ProxyResult { - let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; + let authorization_checks = match self.authorization_checks(proxy_mode, rpc_key).await { + Ok(x) => x, + Err(err) => { + if let Ok(_err) = err.split_db_errors() { + // // TODO: this is too verbose during an outage. the warnings on the config reloader should be fine + // warn!( + // ?err, + // "db is down. cannot check rpc key. fallback to ip rate limits" + // ); + return self.rate_limit_by_ip(ip, origin, proxy_mode).await; + } + + return Err(err); + } + }; // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key if authorization_checks.rpc_secret_key_id.is_none() { + trace!("unknown key. falling back to free limits"); return self.rate_limit_by_ip(ip, origin, proxy_mode).await; } @@ -1320,7 +1327,6 @@ impl Web3ProxyApp { let authorization = Authorization::try_new( authorization_checks, - self.db_conn().ok().cloned(), ip, origin, referer, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 2ee643d7..6c5a074a 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -259,25 +259,28 @@ pub async fn serve( .layer(Extension(Arc::new(response_cache))) // request id .layer( - TraceLayer::new_for_http().make_span_with(|request: &Request| { - // We get the request id from the header - // If no header, a new Ulid is created - // TODO: move this header name to config - let request_id = request - .headers() - .get("x-amzn-trace-id") - .and_then(|x| x.to_str().ok()) - .map(ToString::to_string) - .unwrap_or_else(|| Ulid::new().to_string()); + TraceLayer::new_for_http() + .make_span_with(|request: &Request| { + // We get the request id from the header + // If no header, a new Ulid is created + // TODO: move this header name to config + let request_id = request + .headers() + .get("x-amzn-trace-id") + .and_then(|x| x.to_str().ok()) + .map(ToString::to_string) + .unwrap_or_else(|| Ulid::new().to_string()); - // And then we put it along with other information into the `request` span - error_span!( - "request", - id = %request_id, - // method = %request.method(), - // path = %request.uri().path(), - ) - }), + // And then we put it along with other information into the `request` span + error_span!( + "request", + id = %request_id, + // method = %request.method(), + // path = %request.uri().path(), + ) + }) + // TODO: on failure that has the request and response body so we can debug more easily + .on_failure(()), ) // 404 for any unknown routes .fallback(errors::handler_404); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index d1a8538e..83164d19 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -35,7 +35,7 @@ use std::str::from_utf8_mut; use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock}; -use tracing::{info, trace}; +use tracing::trace; /// How to select backend servers for a request #[derive(Copy, Clone, Debug, Default)] @@ -484,7 +484,7 @@ async fn read_web3_socket( return; } Message::Close(_) => { - info!("closing websocket connection"); + trace!("closing websocket connection"); // TODO: do something to close subscriptions? let _ = close_sender.send(true); return; diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 2cb95a8b..1d8975f4 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -2,6 +2,7 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; use crate::frontend::authorization::{login_is_authorized, RpcSecretKey}; +use crate::globals::{global_db_conn, global_db_replica_conn}; use crate::user_token::UserBearerToken; use axum::{ extract::{Path, Query}, @@ -124,12 +125,12 @@ pub async fn user_login_get( resources: vec![], }; - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; // delete any expired logins if let Err(err) = login::Entity::delete_many() .filter(login::Column::ExpiresAt.lte(now)) - .exec(db_conn) + .exec(&db_conn) .await { warn!(?err, "expired_logins"); @@ -138,7 +139,7 @@ pub async fn user_login_get( // delete any expired pending logins if let Err(err) = pending_login::Entity::delete_many() .filter(pending_login::Column::ExpiresAt.lte(now)) - .exec(db_conn) + .exec(&db_conn) .await { warn!(?err, "expired_pending_logins"); @@ -160,7 +161,7 @@ pub async fn user_login_get( }; user_pending_login - .save(db_conn) + .save(&db_conn) .await .web3_context("saving user's pending_login")?; @@ -261,7 +262,7 @@ pub async fn user_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; let user_pending_login = pending_login::Entity::find() .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce))) @@ -293,7 +294,7 @@ pub async fn user_login_post( .one(db_replica.as_ref()) .await?; - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; let (caller, user_rpc_keys, status_code) = match caller { None => { @@ -384,7 +385,7 @@ pub async fn user_login_post( // the user is already registered let user_rpc_keys = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(caller.id)) - .all(db_conn) + .all(&db_conn) .await .web3_context("failed loading user's key")?; @@ -411,11 +412,15 @@ pub async fn user_login_post( }; user_login - .save(db_conn) + .save(&db_conn) .await .web3_context("saving user login")?; - if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await { + if let Err(err) = user_pending_login + .into_active_model() + .delete(&db_conn) + .await + { error!("Failed to delete nonce:{}: {}", login_nonce, err); } @@ -438,16 +443,15 @@ pub async fn user_login_post( /// `POST /user/logout` - Forget the bearer token in the `Authentication` header. #[debug_handler] pub async fn user_logout_post( - Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) - .exec(db_conn) + .exec(&db_conn) .await { warn!(key=%user_bearer.redis_key(), ?err, "Failed to delete from redis"); diff --git a/web3_proxy/src/frontend/users/mod.rs b/web3_proxy/src/frontend/users/mod.rs index 1caaf3a3..3d428fa7 100644 --- a/web3_proxy/src/frontend/users/mod.rs +++ b/web3_proxy/src/frontend/users/mod.rs @@ -9,6 +9,7 @@ pub mod subuser; use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; +use crate::globals::global_db_transaction; use axum::{ headers::{authorization::Bearer, Authorization}, response::IntoResponse, @@ -80,7 +81,7 @@ pub async fn user_post( } } - let txn = app.db_transaction().await?; + let txn = global_db_transaction().await?; // update the referral code IFF they do not already have one set if let Some(x) = payload.referral_code { diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 2a2f7e1b..265fabdd 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -3,6 +3,7 @@ use crate::balance::Balance; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult}; use crate::frontend::authorization::login_is_authorized; use crate::frontend::users::authentication::register_new_user; +use crate::globals::{global_db_conn, global_db_replica_conn}; use crate::premium::{get_user_and_tier_from_address, grant_premium_tier}; use anyhow::Context; use axum::{ @@ -45,7 +46,7 @@ pub async fn user_balance_get( ) -> Web3ProxyResponse { let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; let user_balance = match Balance::try_from_db(db_replica.as_ref(), user.id).await? { None => Balance::default(), @@ -65,7 +66,7 @@ pub async fn user_chain_deposits_get( ) -> Web3ProxyResponse { let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Filter by user ... let receipts = increase_on_chain_balance_receipt::Entity::find() @@ -104,7 +105,7 @@ pub async fn user_stripe_deposits_get( ) -> Web3ProxyResponse { let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Filter by user ... let receipts = stripe_increase_balance_receipt::Entity::find() @@ -147,7 +148,7 @@ pub async fn user_admin_deposits_get( ) -> Web3ProxyResponse { let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Filter by user ... let receipts = admin_increase_balance_receipt::Entity::find() @@ -206,7 +207,7 @@ pub async fn user_balance_post( Web3ProxyError::BadRequest(format!("unable to parse tx_hash: {}", err).into()) })?; - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; // get the transaction receipt let transaction_receipt = app @@ -235,7 +236,7 @@ pub async fn user_balance_post( true }; - let uncle_hashes = find_uncles.all(db_conn).await?; + let uncle_hashes = find_uncles.all(&db_conn).await?; let uncle_hashes: HashSet<_> = uncle_hashes .into_iter() @@ -283,7 +284,7 @@ pub async fn user_balance_post( .filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex())) .filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id)) .filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(block_hash.encode_hex())) - .one(db_conn) + .one(&db_conn) .await? .is_some() { @@ -413,7 +414,7 @@ pub async fn user_balance_post( // invalidate the cache as well if let Err(err) = app .user_balance_cache - .invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache) + .invalidate(&recipient.id, &db_conn, &app.rpc_secret_key_cache) .await { warn!(?err, user_id=%recipient.id, "unable to invalidate cache"); @@ -484,12 +485,12 @@ pub async fn handle_uncle_block( // user_id -> balance that we need to subtract let mut reversed_balances: HashMap = HashMap::new(); - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; // delete any deposit txids with uncle_hash for reversed_deposit in increase_on_chain_balance_receipt::Entity::find() .filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(uncle_hash.encode_hex())) - .all(db_conn) + .all(&db_conn) .await? { let user_id = reversed_deposit.deposit_to_user_id; @@ -499,11 +500,11 @@ pub async fn handle_uncle_block( *reversed_balance += reversed_deposit.amount; // TODO: instead of delete, mark as uncled? seems like it would bloat the db unnecessarily. a stat should be enough - reversed_deposit.delete(db_conn).await?; + reversed_deposit.delete(&db_conn).await?; if let Err(err) = app .user_balance_cache - .invalidate(&user_id, db_conn, &app.rpc_secret_key_cache) + .invalidate(&user_id, &db_conn, &app.rpc_secret_key_cache) .await { warn!(%user_id, ?err, "unable to invalidate caches"); diff --git a/web3_proxy/src/frontend/users/payment_stripe.rs b/web3_proxy/src/frontend/users/payment_stripe.rs index 8ae3b3f5..cb447d77 100644 --- a/web3_proxy/src/frontend/users/payment_stripe.rs +++ b/web3_proxy/src/frontend/users/payment_stripe.rs @@ -1,5 +1,6 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; +use crate::globals::global_db_conn; use crate::premium::grant_premium_tier; use anyhow::Context; use axum::{response::IntoResponse, Extension}; @@ -68,13 +69,15 @@ pub async fn user_balance_stripe_post( return Ok("Received Webhook".into_response()); } - let db_conn = app.db_conn().context("query_user_stats needs a db")?; + let db_conn = global_db_conn() + .await + .web3_context("query_user_stats needs a db")?; if stripe_increase_balance_receipt::Entity::find() .filter( stripe_increase_balance_receipt::Column::StripePaymentIntendId.eq(intent.id.as_str()), ) - .one(db_conn) + .one(&db_conn) .await? .is_some() { @@ -93,7 +96,7 @@ pub async fn user_balance_stripe_post( ))?; let recipient: Option = user::Entity::find_by_id(recipient_user_id) - .one(db_conn) + .one(&db_conn) .await?; // we do a fixed 2 decimal points because we only accept USD for now @@ -121,7 +124,7 @@ pub async fn user_balance_stripe_post( currency=%intent.currency, %recipient_user_id, %intent.id, "Please refund this transaction!", ); - let _ = insert_receipt_model.save(db_conn).await; + let _ = insert_receipt_model.save(&db_conn).await; return Ok("Received Webhook".into_response()); } @@ -145,7 +148,7 @@ pub async fn user_balance_stripe_post( // Finally invalidate the cache as well if let Err(err) = app .user_balance_cache - .invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache) + .invalidate(&recipient.id, &db_conn, &app.rpc_secret_key_cache) .await { warn!(?err, "unable to invalidate caches"); diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index 29ee5fba..f7d09f88 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -1,7 +1,8 @@ //! Handle registration, logins, and managing account data. -use crate::app::Web3ProxyApp; use crate::errors::Web3ProxyResponse; +use crate::globals::global_db_conn; use crate::referral_code::ReferralCode; +use crate::{app::Web3ProxyApp, globals::global_db_replica_conn}; use anyhow::Context; use axum::{ extract::Query, @@ -35,7 +36,7 @@ pub async fn user_referral_link_get( // First get the bearer token and check if the user is logged in let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Then get the referral token. If one doesn't exist, create one let user_referrer = referrer::Entity::find() @@ -47,7 +48,7 @@ pub async fn user_referral_link_get( Some(x) => (x.referral_code, StatusCode::OK), None => { // Connect to the database for writes - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; let referral_code = ReferralCode::default().to_string(); @@ -56,7 +57,7 @@ pub async fn user_referral_link_get( referral_code: sea_orm::ActiveValue::Set(referral_code.clone()), ..Default::default() }; - referrer_entry.save(db_conn).await?; + referrer_entry.save(&db_conn).await?; (referral_code, StatusCode::CREATED) } @@ -80,7 +81,7 @@ pub async fn user_used_referral_stats( // First get the bearer token and check if the user is logged in let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Get all referral records associated with this user let referrals = referee::Entity::find() @@ -138,7 +139,7 @@ pub async fn user_shared_referral_stats( // First get the bearer token and check if the user is logged in let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Get all referral records associated with this user let query_result = referrer::Entity::find() diff --git a/web3_proxy/src/frontend/users/rpc_keys.rs b/web3_proxy/src/frontend/users/rpc_keys.rs index f725f616..dce4fc22 100644 --- a/web3_proxy/src/frontend/users/rpc_keys.rs +++ b/web3_proxy/src/frontend/users/rpc_keys.rs @@ -2,6 +2,7 @@ use super::super::authorization::RpcSecretKey; use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; +use crate::globals::{global_db_conn, global_db_replica_conn}; use axum::headers::{Header, Origin, Referer, UserAgent}; use axum::{ headers::{authorization::Bearer, Authorization}, @@ -31,7 +32,7 @@ pub async fn rpc_keys_get( ) -> Web3ProxyResponse { let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // This is basically completely copied from sea-orm. Not optimal, but it keeps the format identical to before (while adding the final key) // We could also pack the below stuff into it's subfield, but then we would destroy the format. Both options are fine for now though @@ -160,7 +161,7 @@ pub async fn rpc_keys_management( let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; let mut uk = match payload.key_id { Some(existing_key_id) => { @@ -340,9 +341,9 @@ pub async fn rpc_keys_management( } let uk = if uk.is_changed() { - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; - uk.save(db_conn) + uk.save(&db_conn) .await .web3_context("Failed saving user key")? } else { diff --git a/web3_proxy/src/frontend/users/stats.rs b/web3_proxy/src/frontend/users/stats.rs index 917311f3..8f55bf64 100644 --- a/web3_proxy/src/frontend/users/stats.rs +++ b/web3_proxy/src/frontend/users/stats.rs @@ -1,6 +1,7 @@ //! Handle registration, logins, and managing account data. use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyErrorContext, Web3ProxyResponse}; +use crate::globals::global_db_replica_conn; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, }; @@ -46,7 +47,7 @@ pub async fn user_revert_logs_get( response.insert("chain_id", json!(chain_id)); response.insert("query_start", json!(query_start.timestamp() as u64)); - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) diff --git a/web3_proxy/src/frontend/users/subuser.rs b/web3_proxy/src/frontend/users/subuser.rs index 19be184d..f744c1d3 100644 --- a/web3_proxy/src/frontend/users/subuser.rs +++ b/web3_proxy/src/frontend/users/subuser.rs @@ -2,6 +2,7 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; use crate::frontend::authorization::RpcSecretKey; +use crate::globals::{global_db_conn, global_db_replica_conn}; use anyhow::Context; use axum::{ extract::Query, @@ -35,7 +36,7 @@ pub async fn get_keys_as_subuser( // First, authenticate let subuser = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // TODO: JOIN over RPC_KEY, SUBUSER, PRIMARY_USER and return these items @@ -100,7 +101,7 @@ pub async fn get_subusers( // First, authenticate let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; let rpc_key: u64 = params .remove("key_id") @@ -172,7 +173,7 @@ pub async fn modify_subuser( // First, authenticate let user = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; trace!("Parameters are: {:?}", params); @@ -256,7 +257,8 @@ pub async fn modify_subuser( } // TODO: There is a good chunk of duplicate logic as login-post. Consider refactoring ... - let db_conn = app.db_conn()?; + let db_conn = global_db_conn().await?; + let (subuser, _subuser_rpc_keys, _status_code) = match subuser { None => { // First add a user; the only thing we need from them is an address @@ -332,12 +334,12 @@ pub async fn modify_subuser( let mut active_subuser_entry_secondary_user = secondary_user.into_active_model(); if !keep_subuser { // Remove the user - active_subuser_entry_secondary_user.delete(db_conn).await?; + active_subuser_entry_secondary_user.delete(&db_conn).await?; action = "removed"; } else { // Just change the role active_subuser_entry_secondary_user.role = sea_orm::Set(new_role.clone()); - active_subuser_entry_secondary_user.save(db_conn).await?; + active_subuser_entry_secondary_user.save(&db_conn).await?; action = "role modified"; } } @@ -348,7 +350,7 @@ pub async fn modify_subuser( role: sea_orm::Set(new_role.clone()), ..Default::default() }; - active_subuser_entry_secondary_user.insert(db_conn).await?; + active_subuser_entry_secondary_user.insert(&db_conn).await?; action = "added"; } _ => { diff --git a/web3_proxy/src/globals.rs b/web3_proxy/src/globals.rs new file mode 100644 index 00000000..5b67c173 --- /dev/null +++ b/web3_proxy/src/globals.rs @@ -0,0 +1,55 @@ +use crate::{errors::Web3ProxyError, relational_db::DatabaseReplica}; +use derivative::Derivative; +use migration::{ + sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait}, + DbErr, +}; +use std::sync::{Arc, LazyLock}; +use tokio::sync::RwLock as AsyncRwLock; + +pub static DB_CONN: LazyLock>> = + LazyLock::new(|| AsyncRwLock::new(Err(DatabaseError::NotConfigured))); + +pub static DB_REPLICA: LazyLock>> = + LazyLock::new(|| AsyncRwLock::new(Err(DatabaseError::NotConfigured))); + +#[derive(Clone, Debug, Derivative)] +pub enum DatabaseError { + /// no database configured. depending on what you need, this may or may not be a problem + NotConfigured, + /// an error that happened when creating the connection pool + Connect(Arc), + /// an error that just happened + Begin(Arc), +} + +impl From for Web3ProxyError { + fn from(value: DatabaseError) -> Self { + match value { + DatabaseError::NotConfigured => Self::NoDatabaseConfigured, + DatabaseError::Connect(err) | DatabaseError::Begin(err) => Self::DatabaseArc(err), + } + } +} + +#[inline] +pub async fn global_db_conn() -> Result { + DB_CONN.read().await.clone() +} + +#[inline] +pub async fn global_db_transaction() -> Result { + let x = global_db_conn().await?; + + let x = x + .begin() + .await + .map_err(|x| DatabaseError::Begin(Arc::new(x)))?; + + Ok(x) +} + +#[inline] +pub async fn global_db_replica_conn() -> Result { + DB_REPLICA.read().await.clone() +} diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 6919a8b8..d7476eb3 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(lazy_cell)] #![feature(let_chains)] #![feature(trait_alias)] #![forbid(unsafe_code)] @@ -11,6 +12,7 @@ pub mod compute_units; pub mod config; pub mod errors; pub mod frontend; +pub mod globals; pub mod http_params; pub mod jsonrpc; pub mod pagerduty; diff --git a/web3_proxy/src/relational_db.rs b/web3_proxy/src/relational_db.rs index a4874b14..400fb00f 100644 --- a/web3_proxy/src/relational_db.rs +++ b/web3_proxy/src/relational_db.rs @@ -34,15 +34,15 @@ pub async fn connect_db( // TODO: load all these options from the config file. i think docker mysql default max is 100 // Amazon RDS Proxy default idle timeout is 1800 seconds - // TODO: sqlx info logging is way too verbose for production. db_opt - .acquire_timeout(Duration::from_secs(5)) - .connect_timeout(Duration::from_secs(5)) + .acquire_timeout(Duration::from_secs(3)) + .connect_timeout(Duration::from_secs(3)) .idle_timeout(Duration::from_secs(1795)) - .min_connections(min_connections) .max_connections(max_connections) - .sqlx_logging_level(tracing::log::LevelFilter::Trace) - .sqlx_logging(true); + .max_lifetime(Duration::from_secs(60)) + .min_connections(min_connections) + .sqlx_logging(true) + .sqlx_logging_level(tracing::log::LevelFilter::Trace); Database::connect(db_opt).await } @@ -76,33 +76,23 @@ pub async fn migrate_db( .col(ColumnDef::new(Alias::new("locked")).boolean().default(true)), ); - loop { - if Migrator::get_pending_migrations(db_conn).await?.is_empty() { - info!("no migrations to apply"); - return Ok(()); + if Migrator::get_pending_migrations(db_conn).await?.is_empty() { + info!("no migrations to apply"); + return Ok(()); + } + + info!("checking migration lock..."); + + // there are migrations to apply + // acquire a lock + if db_conn.execute(create_lock_statment).await.is_err() { + if override_existing_lock { + warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!"); + + sleep(Duration::from_secs(10)).await + } else { + return Err(anyhow::anyhow!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\"")); } - - info!("waiting for migration lock..."); - - // there are migrations to apply - // acquire a lock - if let Err(err) = db_conn.execute(create_lock_statment.clone()).await { - if override_existing_lock { - warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!"); - - sleep(Duration::from_secs(10)).await - } else { - debug!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\". err={:?}", err); - - // TODO: exponential backoff with jitter? - sleep(Duration::from_secs(1)).await; - - continue; - } - } - - debug!("migration lock acquired"); - break; } info!("migrating..."); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ef0408f1..fdfff3ae 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -179,7 +179,6 @@ impl Web3Rpcs { return None; } - let db_conn = app.db_conn().ok().cloned(); let http_client = app.http_client.clone(); let vredis_pool = app.vredis_pool.clone(); @@ -197,7 +196,6 @@ impl Web3Rpcs { let handle = tokio::spawn(server_config.spawn( server_name, - db_conn, vredis_pool, chain_id, block_interval, @@ -372,11 +370,11 @@ impl Web3Rpcs { } if let Err(e) = try_join_all(futures).await { - error!("subscriptions over: {:?}", self); + error!(?self, "subscriptions over"); return Err(e); } - info!("subscriptions over: {:?}", self); + info!(?self, "subscriptions over"); Ok(()) } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index cb833a02..3b986d11 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -92,7 +92,6 @@ impl Web3Rpc { config: Web3RpcConfig, name: String, chain_id: u64, - db_conn: Option, // optional because this is only used for http providers. websocket-only providers don't use it http_client: Option, redis_pool: Option, @@ -183,7 +182,6 @@ impl Web3Rpc { block_data_limit, block_interval, created_at: Some(created_at), - db_conn, display_name: config.display_name, hard_limit, hard_limit_until: Some(hard_limit_until), @@ -469,7 +467,7 @@ impl Web3Rpc { let age = self.created_at.unwrap().elapsed().as_millis(); - debug!("clearing head block on {} ({}ms old)!", self, age); + trace!("clearing head block on {} ({}ms old)!", self, age); // send an empty block to take this server out of rotation head_block_sender.send_replace(None); @@ -676,7 +674,7 @@ impl Web3Rpc { disconnect_watch_rx.changed().await?; } - info!("disconnect triggered on {}", rpc); + trace!("disconnect triggered on {}", rpc); Ok(()) }; @@ -723,7 +721,7 @@ impl Web3Rpc { sleep(Duration::from_secs(health_sleep_seconds)).await; } - debug!("healthcheck loop on {} exited", rpc); + trace!("healthcheck loop on {} exited", rpc); Ok(()) }; @@ -747,6 +745,7 @@ impl Web3Rpc { } // exit if any of the futures exit + // TODO: have an enum for which one exited? let first_exit = futures.next().await; debug!(?first_exit, "subscriptions on {} exited", self); @@ -858,7 +857,7 @@ impl Web3Rpc { .await?; if *subscribe_stop_rx.borrow() { - debug!(%self, "new heads subscription exited"); + trace!(%self, "new heads subscription exited"); Ok(()) } else { Err(anyhow!("new_heads subscription exited. reconnect needed").into()) diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 921eb709..40d146c8 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,8 +1,8 @@ use super::one::Web3Rpc; use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, AuthorizationType}; +use crate::globals::{global_db_conn, DB_CONN}; use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; -use anyhow::Context; use chrono::Utc; use derive_more::From; use entities::revert_log; @@ -90,7 +90,7 @@ impl Authorization { } }; - let db_conn = self.db_conn.as_ref().context("no database connection")?; + let db_conn = global_db_conn().await?; // TODO: should the database set the timestamp? // we intentionally use "now" and not the time the request started @@ -111,7 +111,7 @@ impl Authorization { }; let rl = rl - .save(db_conn) + .save(&db_conn) .await .web3_context("Failed saving new revert log")?; @@ -225,7 +225,7 @@ impl OpenRequestHandle { if !["eth_call", "eth_estimateGas"].contains(&method) { // trace!(%method, "skipping save on revert"); RequestErrorHandler::TraceLevel - } else if self.authorization.db_conn.is_some() { + } else if DB_CONN.read().await.is_ok() { let log_revert_chance = self.authorization.checks.log_revert_chance; if log_revert_chance == 0 { @@ -378,6 +378,7 @@ impl OpenRequestHandle { match serde_json::from_value::(json!(params)) { Ok(params) => { // spawn saving to the database so we don't slow down the request + // TODO: log if this errors let f = self.authorization.clone().save_revert(method, params.0 .0); tokio::spawn(f); diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index 7ffac642..055886b0 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -1,6 +1,7 @@ use super::StatType; use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult}; +use crate::globals::{global_db_conn, global_db_replica_conn}; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_query_window_seconds_from_params, get_user_id_from_params, @@ -61,13 +62,13 @@ pub async fn query_user_stats<'a>( params: &'a HashMap, stat_response_type: StatType, ) -> Web3ProxyResponse { - let db_conn = app.db_conn()?; - let db_replica = app.db_replica()?; + let db_conn = global_db_conn().await?; + let db_replica = global_db_replica_conn().await?; let mut redis_conn = app.redis_conn().await?; // get the user id first. if it is 0, we should use a cache on the app let user_id = - get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?; + get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; // get the query window seconds now so that we can pick a cache with a good TTL // TODO: for now though, just do one cache. its easier let query_window_seconds = get_query_window_seconds_from_params(params)?; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 3f2a3c7a..f801f9da 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,5 +1,6 @@ use super::StatType; use crate::errors::Web3ProxyErrorContext; +use crate::globals::global_db_replica_conn; use crate::{ app::Web3ProxyApp, errors::{Web3ProxyError, Web3ProxyResponse}, @@ -47,7 +48,7 @@ pub async fn query_user_stats<'a>( )); } - let db_replica = app.db_replica()?; + let db_replica = global_db_replica_conn().await?; // Read the (optional) user-id from the request, this is the logic for subusers // If there is no bearer token, this is not allowed diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 9f5da9db..f61004f3 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -527,7 +527,7 @@ impl RpcQueryStats { let mut authorization = metadata.authorization.take(); if authorization.is_none() { - authorization = Some(Arc::new(Authorization::internal(None)?)); + authorization = Some(Arc::new(Authorization::internal()?)); } let authorization = authorization.expect("Authorization will always be set"); diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index a16044be..da30ca87 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -2,13 +2,13 @@ use super::{AppStat, FlushedStats, RpcQueryKey}; use crate::app::Web3ProxyJoinHandle; use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::errors::Web3ProxyResult; +use crate::globals::global_db_conn; use crate::stats::RpcQueryStats; use derive_more::From; use futures::stream; use hashbrown::HashMap; use influxdb2::api::write::TimestampPrecision; use migration::sea_orm::prelude::Decimal; -use migration::sea_orm::DatabaseConnection; use std::time::Duration; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{interval, sleep}; @@ -45,7 +45,6 @@ pub struct StatBuffer { accounting_db_buffer: HashMap, billing_period_seconds: i64, chain_id: u64, - db_conn: Option, db_save_interval_seconds: u32, global_timeseries_buffer: HashMap, influxdb_bucket: Option, @@ -64,7 +63,6 @@ impl StatBuffer { pub fn try_spawn( billing_period_seconds: i64, chain_id: u64, - db_conn: Option, db_save_interval_seconds: u32, influxdb_bucket: Option, mut influxdb_client: Option, @@ -79,10 +77,6 @@ impl StatBuffer { influxdb_client = None; } - if db_conn.is_none() && influxdb_client.is_none() { - return Ok(None); - } - let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); let timestamp_precision = TimestampPrecision::Seconds; @@ -91,7 +85,6 @@ impl StatBuffer { accounting_db_buffer: Default::default(), billing_period_seconds, chain_id, - db_conn, db_save_interval_seconds, global_timeseries_buffer: Default::default(), influxdb_bucket, @@ -142,51 +135,50 @@ impl StatBuffer { let mut approximate_balance_remaining = 0.into(); // TODO: re-enable this once I know its not the cause of Polygon W3P crashing all the time - if false { - if let Some(db_conn) = self.db_conn.as_ref() { - let user_id = stat.authorization.checks.user_id; + // TODO: we want to do this even if the db is down. we need to buffer if there is an outage! + if let Ok(db_conn) = global_db_conn().await { + let user_id = stat.authorization.checks.user_id; - // update the user's balance - if user_id != 0 { - // update the user's cached balance - let mut user_balance = stat.authorization.checks.latest_balance.write().await; + // update the user's balance + if user_id != 0 { + // update the user's cached balance + let mut user_balance = stat.authorization.checks.latest_balance.write().await; - // TODO: move this to a helper function - user_balance.total_frontend_requests += 1; - user_balance.total_spent += stat.compute_unit_cost; + // TODO: move this to a helper function + user_balance.total_frontend_requests += 1; + user_balance.total_spent += stat.compute_unit_cost; - if !stat.backend_rpcs_used.is_empty() { - user_balance.total_cache_misses += 1; - } - - // if paid_credits_used is true, then they were premium at the start of the request - if stat.authorization.checks.paid_credits_used { - // TODO: this lets them get a negative remaining balance. we should clear if close to 0 - user_balance.total_spent_paid_credits += stat.compute_unit_cost; - - // check if they still have premium - if user_balance.active_premium() { - // TODO: referall credits here? i think in the save_db section still makes sense for those - } else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await { - // was premium, but isn't anymore due to paying for this query. clear the cache - // TODO: stop at <$0.000001 instead of negative? - warn!(?err, "unable to clear caches"); - } - } else if user_balance.active_premium() { - // paid credits were not used, but now we have active premium. invalidate the caches - // TODO: this seems unliekly. should we warn if this happens so we can investigate? - if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await { - // was premium, but isn't anymore due to paying for this query. clear the cache - // TODO: stop at <$0.000001 instead of negative? - warn!(?err, "unable to clear caches"); - } - } - - approximate_balance_remaining = user_balance.remaining(); + if !stat.backend_rpcs_used.is_empty() { + user_balance.total_cache_misses += 1; } - // self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat.clone(), approximate_balance_remaining).await; + // if paid_credits_used is true, then they were premium at the start of the request + if stat.authorization.checks.paid_credits_used { + // TODO: this lets them get a negative remaining balance. we should clear if close to 0 + user_balance.total_spent_paid_credits += stat.compute_unit_cost; + + // check if they still have premium + if user_balance.active_premium() { + // TODO: referall credits here? i think in the save_db section still makes sense for those + } else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await { + // was premium, but isn't anymore due to paying for this query. clear the cache + // TODO: stop at <$0.000001 instead of negative? + warn!(?err, "unable to clear caches"); + } + } else if user_balance.active_premium() { + // paid credits were not used, but now we have active premium. invalidate the caches + // TODO: this seems unliekly. should we warn if this happens so we can investigate? + if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await { + // was premium, but isn't anymore due to paying for this query. clear the cache + // TODO: stop at <$0.000001 instead of negative? + warn!(?err, "unable to clear caches"); + } + } + + approximate_balance_remaining = user_balance.remaining(); } + + self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat.clone(), approximate_balance_remaining).await; } if self.influxdb_client.is_some() { @@ -259,8 +251,8 @@ impl StatBuffer { // TODO: wait on all websockets to close // TODO: wait on all pending external requests to finish - info!("waiting 10 seconds for remaining stats to arrive"); - sleep(Duration::from_secs(10)).await; + info!("waiting 5 seconds for remaining stats to arrive"); + sleep(Duration::from_secs(5)).await; // loop { // // nope. this won't ever be true because we keep making stats for internal requests @@ -289,7 +281,7 @@ impl StatBuffer { async fn save_relational_stats(&mut self) -> usize { let mut count = 0; - if let Some(db_conn) = self.db_conn.as_ref() { + if let Ok(db_conn) = global_db_conn().await { count = self.accounting_db_buffer.len(); for (key, stat) in self.accounting_db_buffer.drain() { // TODO: batch saves @@ -297,7 +289,7 @@ impl StatBuffer { if let Err(err) = stat .save_db( self.chain_id, - db_conn, + &db_conn, key, &self.user_balance_cache, &self.rpc_secret_key_cache, diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 976f052f..016c04dd 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -78,7 +78,6 @@ impl MigrateStatsToV2SubCommand { let emitter_spawn = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, top_config.app.chain_id, - Some(db_conn.clone()), 30, top_config.app.influxdb_bucket.clone(), influxdb_client.clone(), @@ -117,8 +116,8 @@ impl MigrateStatsToV2SubCommand { // (2) Create request metadata objects to match the old data // Iterate through all old rows, and put them into the above objects. for x in old_records.iter() { - let mut authorization = Authorization::internal(None) - .context("failed creating internal authorization")?; + let mut authorization = + Authorization::internal().context("failed creating internal authorization")?; match x.rpc_key_id { Some(rpc_key_id) => { diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index aabb6088..f61b4e73 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -1,6 +1,7 @@ use crate::app::{flatten_handle, flatten_handles, Web3ProxyApp}; use crate::compute_units::default_usd_per_cu; use crate::config::TopConfig; +use crate::globals::global_db_conn; use crate::stats::FlushedStats; use crate::{frontend, prometheus}; use argh::FromArgs; @@ -148,6 +149,10 @@ impl ProxydSubCommand { prometheus_shutdown_receiver, )); + if spawned_app.app.config.db_url.is_some() { + // give 30 seconds for the db to connect. if it does not connect, it will keep retrying + } + info!("waiting for head block"); let max_wait_until = Instant::now() + Duration::from_secs(35); loop { @@ -286,7 +291,8 @@ impl ProxydSubCommand { } } - if let Ok(db_conn) = spawned_app.app.db_conn().cloned() { + // TODO: make sure this happens even if we exit with an error + if let Ok(db_conn) = global_db_conn().await { /* From the sqlx docs: diff --git a/web3_proxy/src/sub_commands/transfer_key.rs b/web3_proxy/src/sub_commands/transfer_key.rs index f4fa4862..9b4b9c72 100644 --- a/web3_proxy/src/sub_commands/transfer_key.rs +++ b/web3_proxy/src/sub_commands/transfer_key.rs @@ -52,7 +52,7 @@ impl TransferKeySubCommand { uk.user_id = sea_orm::Set(new_u.id); - let _uk = uk.save(db_conn).await?; + uk.save(db_conn).await?; info!("changed the key's owner"); } diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index 6141ba2f..13e10027 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -4,20 +4,20 @@ use ethers::{ types::Address, }; use hashbrown::HashMap; -use parking_lot::Mutex; use serde_json::json; use std::{ env, str::FromStr, sync::atomic::{AtomicU16, Ordering}, + thread, }; use std::{sync::Arc, time::Duration}; use tokio::{ + runtime::Builder, sync::{ broadcast::{self, error::SendError}, mpsc, oneshot, }, - task::JoinHandle, time::{sleep, Instant}, }; use tracing::info; @@ -28,8 +28,9 @@ use web3_proxy::{ }; pub struct TestApp { - /// spawn handle for the proxy. - pub proxy_handle: Mutex>>>, + /// **THREAD** (not async) handle for the proxy. + /// In an Option so we can take it and not break the `impl Drop` + pub proxy_handle: Option>>, /// connection to the proxy that is connected to anil. pub proxy_provider: Provider, @@ -96,16 +97,29 @@ impl TestApp { // spawn the app // TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop let handle = { - tokio::spawn(ProxydSubCommand::_main( - top_config, - None, - frontend_port_arc.clone(), - prometheus_port_arc, - num_workers, - shutdown_sender.clone(), - flush_stat_buffer_sender.clone(), - flush_stat_buffer_receiver, - )) + let frontend_port_arc = frontend_port_arc.clone(); + let prometheus_port_arc = prometheus_port_arc.clone(); + let flush_stat_buffer_sender = flush_stat_buffer_sender.clone(); + let shutdown_sender = shutdown_sender.clone(); + + thread::spawn(move || { + let runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .unwrap(); + + runtime.block_on(ProxydSubCommand::_main( + top_config, + None, + frontend_port_arc, + prometheus_port_arc, + num_workers, + shutdown_sender, + flush_stat_buffer_sender, + flush_stat_buffer_receiver, + )) + }) }; let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed); @@ -125,7 +139,7 @@ impl TestApp { let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); Self { - proxy_handle: Mutex::new(Some(handle)), + proxy_handle: Some(handle), proxy_provider, flush_stat_buffer_sender, shutdown_sender, @@ -148,15 +162,11 @@ impl TestApp { } #[allow(unused)] - pub async fn wait(&self) { + pub fn wait_for_stop(mut self) { let _ = self.stop(); - // TODO: lock+take feels weird, but it works - let handle = self.proxy_handle.lock().take(); - - if let Some(handle) = handle { - info!("waiting for the app to stop..."); - handle.await.unwrap().unwrap(); + if let Some(handle) = self.proxy_handle.take() { + handle.join().unwrap(); } } } diff --git a/web3_proxy/tests/common/create_admin.rs b/web3_proxy/tests/common/create_admin.rs index 48bfebd0..3c553e21 100644 --- a/web3_proxy/tests/common/create_admin.rs +++ b/web3_proxy/tests/common/create_admin.rs @@ -1,6 +1,7 @@ use crate::TestApp; use ethers::prelude::{LocalWallet, Signer}; use ethers::types::Signature; +use http::StatusCode; use tracing::info; use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin}; use web3_proxy::sub_commands::ChangeAdminStatusSubCommand; @@ -80,17 +81,18 @@ pub async fn create_user_as_admin( admin_wallet.address() ); let admin_login_message = r.get(admin_login_get_url).send().await.unwrap(); - let admin_login_message = admin_login_message.text().await.unwrap(); + + assert_eq!(admin_login_message.status(), StatusCode::OK); + + let admin_login_text = admin_login_message.text().await.unwrap(); + info!(?admin_login_text); // Sign the message and POST it to login as admin - let admin_signed: Signature = admin_wallet - .sign_message(&admin_login_message) - .await - .unwrap(); + let admin_signed: Signature = admin_wallet.sign_message(&admin_login_text).await.unwrap(); info!(?admin_signed); let admin_post_login_data = PostLogin { - msg: admin_login_message, + msg: admin_login_text, sig: admin_signed.to_string(), referral_code: None, }; diff --git a/web3_proxy/tests/common/create_user.rs b/web3_proxy/tests/common/create_user.rs index a5695a55..f5a47e53 100644 --- a/web3_proxy/tests/common/create_user.rs +++ b/web3_proxy/tests/common/create_user.rs @@ -2,11 +2,12 @@ use crate::TestApp; use entities::{user, user_tier}; use ethers::prelude::{LocalWallet, Signer}; use ethers::types::Signature; +use http::StatusCode; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, }; -use tracing::info; +use tracing::{info, trace}; use web3_proxy::errors::Web3ProxyResult; use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin}; @@ -38,15 +39,20 @@ pub async fn create_user( }; info!(?user_post_login_data); - let mut user_login_response = r + let user_login_response = r .post(&login_post_url) .json(&user_post_login_data) .send() .await - .unwrap() - .json::() - .await .unwrap(); + trace!(?user_login_response); + + assert_eq!(user_login_response.status(), StatusCode::OK); + + let user_login_text = user_login_response.text().await.unwrap(); + trace!("user_login_text: {:#}", user_login_text); + + let user_login_response: LoginPostResponse = serde_json::from_str(&user_login_text).unwrap(); info!(?user_login_response); user_login_response diff --git a/web3_proxy/tests/common/mysql.rs b/web3_proxy/tests/common/mysql.rs index e4989693..13fe144d 100644 --- a/web3_proxy/tests/common/mysql.rs +++ b/web3_proxy/tests/common/mysql.rs @@ -8,7 +8,7 @@ use tokio::{ time::{sleep, Instant}, }; use tracing::{info, trace, warn}; -use web3_proxy::relational_db::get_migrated_db; +use web3_proxy::relational_db::{connect_db, get_migrated_db}; /// on drop, the mysql docker container will be shut down pub struct TestMysql { @@ -166,9 +166,7 @@ impl TestMysql { } pub async fn conn(&self) -> DatabaseConnection { - get_migrated_db(self.url.clone().unwrap(), 1, 5) - .await - .unwrap() + connect_db(self.url.clone().unwrap(), 1, 5).await.unwrap() } } diff --git a/web3_proxy/tests/test_admins.rs b/web3_proxy/tests/test_admins.rs index f23dcc30..c15869f0 100644 --- a/web3_proxy/tests/test_admins.rs +++ b/web3_proxy/tests/test_admins.rs @@ -62,7 +62,7 @@ async fn test_admin_grant_credits() { let user_balance = user_get_balance(&x, &r, &user_login_response).await; assert_eq!(user_balance.remaining(), Decimal::from(100)); - x.wait().await; + x.wait_for_stop(); } // #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] diff --git a/web3_proxy/tests/test_proxy.rs b/web3_proxy/tests/test_proxy.rs index 4013292b..3fc76a17 100644 --- a/web3_proxy/tests/test_proxy.rs +++ b/web3_proxy/tests/test_proxy.rs @@ -97,8 +97,10 @@ async fn it_starts_and_stops() { assert_eq!(anvil_result, proxy_result.unwrap()); // this won't do anything since stats aren't tracked when there isn't a db - x.flush_stats().await.unwrap_err(); + let flushed = x.flush_stats().await.unwrap(); + assert_eq!(flushed.relational, 0); + assert_eq!(flushed.timeseries, 0); // most tests won't need to wait, but we should wait here to be sure all the shutdown logic works properly - x.wait().await; + x.wait_for_stop(); }