Global db (#191)

* dont migrate on start. just connect

* it compiles

* that log is loud

* get transaction from a local clone

* import the trait

* put the test app into a thread instead of tokio::spawn

* fix one test

* try db before rpcs

* db connection is too slow. need to wait for it

* do db setup once while spawning
This commit is contained in:
Bryan Stitt 2023-07-14 18:30:01 -07:00 committed by GitHub
parent 269aa32260
commit 0046a02a4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 580 additions and 394 deletions

@ -1,5 +1,6 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::http_params::get_user_id_from_params;
use axum::response::IntoResponse;
use axum::{
@ -42,8 +43,8 @@ pub async fn query_admin_modify_usertier<'a>(
let mut response_body = HashMap::new();
// Establish connections
let db_conn = app.db_conn()?;
let db_replica = app.db_replica()?;
let db_conn = global_db_conn().await?;
let db_replica = global_db_replica_conn().await?;
let mut redis_conn = app.redis_conn().await?;
// Will modify logic here
@ -52,14 +53,14 @@ pub async fn query_admin_modify_usertier<'a>(
// TODO: Make a single query, where you retrieve the user, and directly from it the secondary user (otherwise we do two jumpy, which is unnecessary)
// get the user id first. if it is 0, we should use a cache on the app
let caller_id =
get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?;
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
trace!(%caller_id, "query_admin_modify_usertier");
// Check if the caller is an admin (i.e. if he is in an admin table)
let _admin = admin::Entity::find()
.filter(admin::Column::UserId.eq(caller_id))
.one(db_conn)
.one(&db_conn)
.await?
.ok_or(Web3ProxyError::AccessDenied("not an admin".into()))?;
@ -68,7 +69,7 @@ pub async fn query_admin_modify_usertier<'a>(
// Fetch the admin, and the user
let user: user::Model = user::Entity::find()
.filter(user::Column::Address.eq(user_address.as_bytes()))
.one(db_conn)
.one(&db_conn)
.await?
.ok_or(Web3ProxyError::BadRequest(
"No user with this id found".into(),
@ -82,7 +83,7 @@ pub async fn query_admin_modify_usertier<'a>(
// Now we can modify the user's tier
let new_user_tier: user_tier::Model = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq(user_tier_title.clone()))
.one(db_conn)
.one(&db_conn)
.await?
.ok_or(Web3ProxyError::BadRequest(
"User Tier name was not found".into(),
@ -95,7 +96,7 @@ pub async fn query_admin_modify_usertier<'a>(
user.user_tier_id = sea_orm::Set(new_user_tier.id);
user.save(db_conn).await?;
user.save(&db_conn).await?;
info!("user's tier changed");
}
@ -103,7 +104,7 @@ pub async fn query_admin_modify_usertier<'a>(
// Now delete all bearer tokens of this user
login::Entity::delete_many()
.filter(login::Column::UserId.eq(user.id))
.exec(db_conn)
.exec(&db_conn)
.await?;
Ok(Json(&response_body).into_response())

@ -8,11 +8,12 @@ use crate::frontend::authorization::{
Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes,
};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::globals::{global_db_conn, DatabaseError, DB_CONN, DB_REPLICA};
use crate::jsonrpc::{
JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcId,
JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, JsonRpcResultData,
};
use crate::relational_db::{connect_db, get_migrated_db, DatabaseConnection, DatabaseReplica};
use crate::relational_db::{connect_db, migrate_db};
use crate::response_cache::{
JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher,
};
@ -34,7 +35,7 @@ use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait};
use migration::sea_orm::{EntityTrait, PaginatorTrait};
use moka::future::{Cache, CacheBuilder};
use once_cell::sync::OnceCell;
use redis_rate_limiter::redis::AsyncCommands;
@ -49,10 +50,10 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio::time::{sleep, timeout};
use tracing::{error, info, trace, warn, Level};
// TODO: make this customizable?
@ -87,10 +88,6 @@ pub struct Web3ProxyApp {
/// don't drop this or the sender will stop working
/// TODO: broadcast channel instead?
pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
/// Optional database for users and accounting
pub db_conn: Option<DatabaseConnection>,
/// Optional read-only database for users and accounting
pub db_replica: Option<DatabaseReplica>,
pub hostname: Option<String>,
pub frontend_port: Arc<AtomicU16>,
/// rate limit anonymous users
@ -178,8 +175,13 @@ impl Web3ProxyApp {
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut config_watcher_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
let (new_top_config_sender, mut new_top_config_receiver) =
watch::channel(top_config.clone());
new_top_config_receiver.borrow_and_update();
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
// TODO: maybe don't spawn with a config at all. have all config updates come through an apply_top_config call
@ -212,62 +214,6 @@ impl Web3ProxyApp {
let important_background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> =
FuturesUnordered::new();
// connect to the database and make sure the latest migrations have run
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
if let Some(db_url) = top_config.app.db_url.clone() {
let db_min_connections = top_config
.app
.db_min_connections
.unwrap_or(num_workers as u32);
// TODO: what default multiple?
let db_max_connections = top_config
.app
.db_max_connections
.unwrap_or(db_min_connections * 2);
db_conn = Some(
get_migrated_db(db_url.clone(), db_min_connections, db_max_connections).await?,
);
db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() {
if db_replica_url == db_url {
// url is the same. do not make a new connection or we might go past our max connections
db_conn.clone().map(Into::into)
} else {
let db_replica_min_connections = top_config
.app
.db_replica_min_connections
.unwrap_or(db_min_connections);
let db_replica_max_connections = top_config
.app
.db_replica_max_connections
.unwrap_or(db_max_connections);
let db_replica = connect_db(
db_replica_url,
db_replica_min_connections,
db_replica_max_connections,
)
.await?;
Some(db_replica.into())
}
} else {
// just clone so that we don't need a bunch of checks all over our code
db_conn.clone().map(Into::into)
};
} else {
anyhow::ensure!(
top_config.app.db_replica_url.is_none(),
"if there is a db_replica_url, there must be a db_url"
);
warn!("no database. some features will be disabled");
};
// connect to kafka for logging requests from the /debug/ urls
let mut kafka_producer: Option<rdkafka::producer::FutureProducer> = None;
@ -379,7 +325,6 @@ impl Web3ProxyApp {
let stat_sender = if let Some(spawned_stat_buffer) = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
top_config.app.chain_id,
db_conn.clone(),
60,
top_config.app.influxdb_bucket.clone(),
influxdb_client.clone(),
@ -472,6 +417,7 @@ impl Web3ProxyApp {
let chain_id = top_config.app.chain_id;
// TODO: remove this. it should only be done by apply_top_config
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
chain_id,
top_config.app.max_head_block_lag,
@ -487,6 +433,7 @@ impl Web3ProxyApp {
// prepare a Web3Rpcs to hold all our private connections
// only some chains have this, so this is optional
// TODO: remove this. it should only be done by apply_top_config
let private_rpcs = if top_config.private_rpcs.is_none() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
@ -515,6 +462,7 @@ impl Web3ProxyApp {
// prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections
// only some chains have this, so this is optional
// TODO: remove this. it should only be done by apply_top_config
let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() {
warn!("No bundler_4337_rpcs configured");
None
@ -545,8 +493,6 @@ impl Web3ProxyApp {
balanced_rpcs,
bundler_4337_rpcs,
config: top_config.app.clone(),
db_conn,
db_replica,
frontend_port: frontend_port.clone(),
frontend_ip_rate_limiter,
frontend_registered_user_rate_limiter,
@ -568,31 +514,47 @@ impl Web3ProxyApp {
watch_consensus_head_receiver,
};
// TODO: do apply_top_config once we don't duplicate the db
if let Err(err) = app.apply_top_config_db(&top_config).await {
warn!(?err, "unable to fully apply config while starting!");
};
let app = Arc::new(app);
// watch for config changes
// TODO: initial config reload should be from this channel. not from the call to spawn
let (new_top_config_sender, mut new_top_config_receiver) = watch::channel(top_config);
// TODO: move this to its own function/struct
{
let app = app.clone();
let config_handle = tokio::spawn(async move {
loop {
let new_top_config = new_top_config_receiver.borrow_and_update().to_owned();
if let Err(err) = app.apply_top_config(new_top_config).await {
error!("unable to apply config! {:?}", err);
};
// TODO: compare new and old here? the sender should be doing that already but maybe its better here
new_top_config_receiver
.changed()
.await
.web3_context("failed awaiting top_config change")?;
if let Err(err) = app.apply_top_config_rpcs(&new_top_config).await {
error!(?err, "unable to apply config! Retrying in 10 seconds (or if the config changes)");
select! {
_ = config_watcher_shutdown_receiver.recv() => {
break;
}
_ = sleep(Duration::from_secs(10)) => {}
_ = new_top_config_receiver.changed() => {}
}
} else {
select! {
_ = config_watcher_shutdown_receiver.recv() => {
break;
}
_ = new_top_config_receiver.changed() => {}
}
}
}
Ok(())
});
app_handles.push(config_handle);
important_background_handles.push(config_handle);
}
if important_background_handles.is_empty() {
@ -616,41 +578,167 @@ impl Web3ProxyApp {
})
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> {
// TODO: also update self.config from new_top_config.app
pub async fn apply_top_config(&self, new_top_config: &TopConfig) -> Web3ProxyResult<()> {
// TODO: update self.config from new_top_config.app (or move it entirely to a global)
// connect to the db first
let db = self.apply_top_config_db(new_top_config).await;
// then refresh rpcs
let rpcs = self.apply_top_config_rpcs(new_top_config).await;
// error if either failed
// TODO: if both error, log both errors
db?;
rpcs?;
Ok(())
}
async fn apply_top_config_rpcs(&self, new_top_config: &TopConfig) -> Web3ProxyResult<()> {
info!("applying new config");
// connect to the backends
self.balanced_rpcs
.apply_server_configs(self, new_top_config.balanced_rpcs)
let balanced = self
.balanced_rpcs
.apply_server_configs(self, new_top_config.balanced_rpcs.clone())
.await
.web3_context("updating balanced rpcs")?;
.web3_context("updating balanced rpcs");
if let Some(private_rpc_configs) = new_top_config.private_rpcs {
let private = if let Some(private_rpc_configs) = new_top_config.private_rpcs.clone() {
if let Some(ref private_rpcs) = self.private_rpcs {
private_rpcs
.apply_server_configs(self, private_rpc_configs)
.await
.web3_context("updating private_rpcs")?;
.web3_context("updating private_rpcs")
} else {
// TODO: maybe we should have private_rpcs just be empty instead of being None
todo!("handle toggling private_rpcs")
}
}
} else {
Ok(())
};
if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs {
if let Some(ref bundler_4337_rpcs) = self.bundler_4337_rpcs {
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs)
.await
.web3_context("updating bundler_4337_rpcs")?;
let bundler_4337 =
if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs.clone() {
if let Some(ref bundler_4337_rpcs) = self.bundler_4337_rpcs {
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs.clone())
.await
.web3_context("updating bundler_4337_rpcs")
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo!("handle toggling bundler_4337_rpcs")
}
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo!("handle toggling bundler_4337_rpcs")
}
}
Ok(())
};
info!("config applied successfully");
// TODO: log all the errors if there are multiple
balanced?;
private?;
bundler_4337?;
Ok(())
}
pub async fn apply_top_config_db(&self, new_top_config: &TopConfig) -> Web3ProxyResult<()> {
// TODO: get the actual value
let num_workers = 2;
// connect to the db
// THIS DOES NOT RUN MIGRATIONS!
if let Some(db_url) = new_top_config.app.db_url.clone() {
let db_min_connections = new_top_config
.app
.db_min_connections
.unwrap_or(num_workers as u32);
// TODO: what default multiple?
let db_max_connections = new_top_config
.app
.db_max_connections
.unwrap_or(db_min_connections * 2);
let db_conn = if let Ok(old_db_conn) = global_db_conn().await {
// TODO: compare old settings with new settings. don't always re-use!
Ok(old_db_conn)
} else {
connect_db(db_url.clone(), db_min_connections, db_max_connections)
.await
.map_err(|err| DatabaseError::Connect(Arc::new(err)))
};
let db_replica = if let Ok(db_conn) = db_conn.as_ref() {
let db_replica =
if let Some(db_replica_url) = new_top_config.app.db_replica_url.clone() {
if db_replica_url == db_url {
// url is the same. do not make a new connection or we might go past our max connections
Ok(db_conn.clone().into())
} else {
let db_replica_min_connections = new_top_config
.app
.db_replica_min_connections
.unwrap_or(db_min_connections);
let db_replica_max_connections = new_top_config
.app
.db_replica_max_connections
.unwrap_or(db_max_connections);
let db_replica = if let Ok(old_db_replica) = global_db_conn().await {
// TODO: compare old settings with new settings. don't always re-use!
Ok(old_db_replica)
} else {
connect_db(
db_replica_url,
db_replica_min_connections,
db_replica_max_connections,
)
.await
.map_err(|err| DatabaseError::Connect(Arc::new(err)))
};
// if db_replica is error, but db_conn is success. log error and just use db_conn
if let Err(err) = db_replica.as_ref() {
error!(?err, "db replica is down! using primary");
Ok(db_conn.clone().into())
} else {
db_replica.map(Into::into)
}
}
} else {
// just clone so that we don't need a bunch of checks all over our code
trace!("no db replica configured");
Ok(db_conn.clone().into())
};
// db and replica are connected. try to migrate
if let Err(err) = migrate_db(db_conn, false).await {
error!(?err, "unable to migrate!");
}
db_replica
} else {
db_conn.clone().map(Into::into)
};
let mut locked_conn = DB_CONN.write().await;
let mut locked_replica = DB_REPLICA.write().await;
*locked_conn = db_conn.clone();
*locked_replica = db_replica.clone();
db_conn?;
db_replica?;
info!("set global db connections");
} else if new_top_config.app.db_replica_url.is_some() {
return Err(anyhow::anyhow!("db_replica_url set, but no db_url set!").into());
} else {
warn!("no database. some features will be disabled");
};
Ok(())
}
@ -662,7 +750,7 @@ impl Web3ProxyApp {
pub fn influxdb_client(&self) -> Web3ProxyResult<&influxdb2::Client> {
self.influxdb_client
.as_ref()
.ok_or(Web3ProxyError::NoDatabase)
.ok_or(Web3ProxyError::NoDatabaseConfigured)
}
/// an ethers provider that you can use with ether's abigen.
@ -702,8 +790,8 @@ impl Web3ProxyApp {
#[derive(Default, Serialize)]
struct UserCount(i64);
let user_count: UserCount = if let Ok(db) = self.db_conn() {
match user::Entity::find().count(db).await {
let user_count: UserCount = if let Ok(db) = global_db_conn().await {
match user::Entity::find().count(&db).await {
Ok(user_count) => UserCount(user_count as i64),
Err(err) => {
warn!(?err, "unable to count users");
@ -864,9 +952,7 @@ impl Web3ProxyApp {
method: &str,
params: P,
) -> Web3ProxyResult<R> {
let db_conn = self.db_conn().ok().cloned();
let authorization = Arc::new(Authorization::internal(db_conn)?);
let authorization = Arc::new(Authorization::internal()?);
self.authorized_request(method, params, authorization).await
}
@ -988,29 +1074,9 @@ impl Web3ProxyApp {
Ok((collected, collected_rpcs))
}
#[inline]
pub fn db_conn(&self) -> Web3ProxyResult<&DatabaseConnection> {
self.db_conn.as_ref().ok_or(Web3ProxyError::NoDatabase)
}
#[inline]
pub async fn db_transaction(&self) -> Web3ProxyResult<DatabaseTransaction> {
if let Some(ref db_conn) = self.db_conn {
let x = db_conn.begin().await?;
Ok(x)
} else {
Err(Web3ProxyError::NoDatabase)
}
}
#[inline]
pub fn db_replica(&self) -> Web3ProxyResult<&DatabaseReplica> {
self.db_replica.as_ref().ok_or(Web3ProxyError::NoDatabase)
}
pub async fn redis_conn(&self) -> Web3ProxyResult<redis_rate_limiter::RedisConnection> {
match self.vredis_pool.as_ref() {
None => Err(Web3ProxyError::NoDatabase),
None => Err(Web3ProxyError::NoDatabaseConfigured),
Some(redis_pool) => {
// TODO: add a From for this
let redis_conn = redis_pool.get().await.context("redis pool error")?;
@ -1452,7 +1518,7 @@ impl Web3ProxyApp {
.zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now)
.await?;
}
Err(Web3ProxyError::NoDatabase) => {},
Err(Web3ProxyError::NoDatabaseConfigured) => {},
Err(err) => {
warn!(
?err,

@ -10,7 +10,7 @@ use migration::{Func, SimpleExpr};
use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::info;
use tracing::trace;
/// Implements the balance getter which combines data from several tables
#[derive(Clone, Debug, Default, Deserialize)]
@ -220,8 +220,7 @@ impl Balance {
user_id,
};
// TODO: lower log level
info!("balance: {:#}", json!(&balance));
trace!("balance: {:#}", json!(&balance));
// Return None if there is no entry
Ok(Some(balance))

@ -1,5 +1,5 @@
use crate::balance::Balance;
use crate::errors::Web3ProxyResult;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey};
use derive_more::From;
use entities::rpc_key;
@ -50,7 +50,7 @@ impl UserBalanceCache {
};
trace!(?x, "from database");
Ok(Arc::new(AsyncRwLock::new(x)))
Ok::<_, Web3ProxyError>(Arc::new(AsyncRwLock::new(x)))
})
.await?;

@ -6,7 +6,6 @@ use ethers::prelude::{Address, TxHash};
use ethers::types::{U256, U64};
use hashbrown::HashMap;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use sentry::types::Dsn;
use serde::Deserialize;
use serde_inline_default::serde_inline_default;
@ -276,7 +275,6 @@ impl Web3RpcConfig {
pub async fn spawn(
self,
name: String,
db_conn: Option<DatabaseConnection>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
chain_id: u64,
block_interval: Duration,
@ -293,7 +291,6 @@ impl Web3RpcConfig {
self,
name,
chain_id,
db_conn,
http_client,
redis_pool,
block_interval,

@ -40,7 +40,6 @@ impl From<Web3ProxyError> for Web3ProxyResult<()> {
}
}
// TODO: replace all String with `Cow<'static, str>`
#[derive(Debug, Display, Error, From)]
pub enum Web3ProxyError {
Abi(ethers::abi::Error),
@ -59,6 +58,7 @@ pub enum Web3ProxyError {
BadRouting,
Contract(ContractError<EthersHttpProvider>),
Database(DbErr),
DatabaseArc(Arc<DbErr>),
Decimal(DecimalError),
EthersHttpClient(ethers::prelude::HttpClientError),
EthersProvider(ethers::prelude::ProviderError),
@ -100,7 +100,7 @@ pub enum Web3ProxyError {
NoBlockNumberOrHash,
NoBlocksKnown,
NoConsensusHeadBlock,
NoDatabase,
NoDatabaseConfigured,
NoHandleReady,
NoServersSynced,
#[display(fmt = "{}/{}", num_known, min_head_rpcs)]
@ -271,6 +271,17 @@ impl Web3ProxyError {
},
)
}
Self::DatabaseArc(err) => {
error!(?err, "database (arc) err: {}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "database (arc) error!".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::Decimal(err) => {
debug!(?err, "Decimal Error: {}", err);
(
@ -614,13 +625,13 @@ impl Web3ProxyError {
},
)
}
Self::NoDatabase => {
Self::NoDatabaseConfigured => {
// TODO: this needs more context
error!("no database configured");
debug!("no database configured");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "no database configured!".into(),
message: "no database configured! this request needs a database".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
@ -1088,6 +1099,26 @@ impl Web3ProxyError {
(status_code, Json(response)).into_response()
}
/// some things should keep going even if the db is down
pub fn split_db_errors(&self) -> Result<&Self, &Self> {
match self {
Web3ProxyError::NoDatabaseConfigured => Ok(self),
Web3ProxyError::Database(err) => {
warn!(?err, "db error while checking rpc key authorization");
Ok(self)
}
Web3ProxyError::DatabaseArc(err) => {
warn!(?err, "db arc error while checking rpc key authorization");
Ok(self)
}
Web3ProxyError::Arc(x) => {
// errors from inside moka cache helpers are wrapped in an Arc
x.split_db_errors()
}
_ => Err(self),
}
}
}
impl From<ethers::types::ParseBytesError> for Web3ProxyError {

@ -6,6 +6,7 @@ use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyResponse;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext};
use crate::frontend::users::authentication::PostLogin;
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::premium::{get_user_and_tier_from_address, grant_premium_tier};
use crate::user_token::UserBearerToken;
use axum::{
@ -26,6 +27,7 @@ use http::StatusCode;
use migration::sea_orm::prelude::{Decimal, Uuid};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
TransactionTrait,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -57,7 +59,8 @@ pub async fn admin_increase_balance(
let caller = app.bearer_is_authorized(bearer).await?;
// Establish connections
let txn = app.db_transaction().await?;
let db_conn = global_db_conn().await?;
let txn = db_conn.begin().await?;
// Check if the caller is an admin (if not, return early)
let admin_entry: admin::Model = admin::Entity::find()
@ -90,7 +93,7 @@ pub async fn admin_increase_balance(
// Invalidate the user_balance_cache for this user:
if let Err(err) = app
.user_balance_cache
.invalidate(&user_entry.id, app.db_conn()?, &app.rpc_secret_key_cache)
.invalidate(&user_entry.id, &db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");
@ -194,8 +197,8 @@ pub async fn admin_imitate_login_get(
resources: vec![],
};
let db_conn = app.db_conn()?;
let db_replica = app.db_replica()?;
let db_conn = global_db_conn().await?;
let db_replica = global_db_replica_conn().await?;
let admin = user::Entity::find()
.filter(user::Column::Address.eq(admin_address.as_bytes()))
@ -219,7 +222,7 @@ pub async fn admin_imitate_login_get(
let now = Utc::now();
let delete_result = pending_login::Entity::delete_many()
.filter(pending_login::Column::ExpiresAt.lte(now))
.exec(db_conn)
.exec(&db_conn)
.await?;
trace!("cleared expired pending_logins: {:?}", delete_result);
@ -233,7 +236,7 @@ pub async fn admin_imitate_login_get(
};
trail
.save(db_conn)
.save(&db_conn)
.await
.web3_context("saving user's pending_login")?;
@ -256,7 +259,7 @@ pub async fn admin_imitate_login_get(
};
user_pending_login
.save(db_conn)
.save(&db_conn)
.await
.web3_context("saving an admin trail pre login")?;
@ -333,7 +336,7 @@ pub async fn admin_imitate_login_post(
})?;
// fetch the message we gave them from our database
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce)))
@ -376,7 +379,7 @@ pub async fn admin_imitate_login_post(
.await?
.web3_context("admin address was not found!")?;
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
// Add a message that the admin has logged in
// Note that the admin is trying to log in as this user
@ -388,7 +391,7 @@ pub async fn admin_imitate_login_post(
..Default::default()
};
trail
.save(db_conn)
.save(&db_conn)
.await
.web3_context("saving an admin trail post login")?;
@ -437,11 +440,15 @@ pub async fn admin_imitate_login_post(
};
user_login
.save(db_conn)
.save(&db_conn)
.await
.web3_context("saving user login")?;
if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await {
if let Err(err) = user_pending_login
.into_active_model()
.delete(&db_conn)
.await
{
warn!(none=?login_nonce.0, ?err, "Failed to delete nonce");
}

@ -5,6 +5,7 @@ use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::balance::Balance;
use crate::caches::RegisteredUserRateLimitKey;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::globals::global_db_replica_conn;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::one::Web3Rpc;
@ -26,8 +27,7 @@ use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use parking_lot::Mutex;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
@ -47,7 +47,7 @@ use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{error, info, trace, warn};
use tracing::{error, trace, warn};
use ulid::Ulid;
use uuid::Uuid;
@ -173,7 +173,6 @@ pub struct AuthorizationChecks {
#[derive(Clone, Debug)]
pub struct Authorization {
pub checks: AuthorizationChecks,
pub db_conn: Option<DatabaseConnection>,
pub ip: IpAddr,
pub origin: Option<Origin>,
pub referer: Option<Referer>,
@ -390,7 +389,7 @@ pub struct RequestMetadata {
impl Default for Authorization {
fn default() -> Self {
Authorization::internal(None).unwrap()
Authorization::internal().unwrap()
}
}
@ -670,7 +669,7 @@ impl From<RpcSecretKey> for Uuid {
}
impl Authorization {
pub fn internal(db_conn: Option<DatabaseConnection>) -> Web3ProxyResult<Self> {
pub fn internal() -> Web3ProxyResult<Self> {
let authorization_checks = AuthorizationChecks {
// any error logs on a local (internal) query are likely problems. log them all
log_revert_chance: 100,
@ -683,7 +682,6 @@ impl Authorization {
Self::try_new(
authorization_checks,
db_conn,
&ip,
None,
None,
@ -694,7 +692,6 @@ impl Authorization {
pub fn external(
allowed_origin_requests_per_period: &HashMap<String, u64>,
db_conn: Option<DatabaseConnection>,
ip: &IpAddr,
origin: Option<&Origin>,
proxy_mode: ProxyMode,
@ -719,7 +716,6 @@ impl Authorization {
Self::try_new(
authorization_checks,
db_conn,
ip,
origin,
referer,
@ -731,7 +727,6 @@ impl Authorization {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
authorization_checks: AuthorizationChecks,
db_conn: Option<DatabaseConnection>,
ip: &IpAddr,
origin: Option<&Origin>,
referer: Option<&Referer>,
@ -786,7 +781,6 @@ impl Authorization {
Ok(Self {
checks: authorization_checks,
db_conn,
ip: *ip,
origin: origin.cloned(),
referer: referer.cloned(),
@ -994,7 +988,7 @@ impl Web3ProxyApp {
let user_bearer_token = UserBearerToken::try_from(bearer)?;
// get the attached address from the database for the given auth_token.
let db_replica = self.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let user_bearer_uuid: Uuid = user_bearer_token.into();
@ -1019,7 +1013,6 @@ impl Web3ProxyApp {
// we don't care about user agent or origin or referer
let authorization = Authorization::external(
&self.config.allowed_origin_requests_per_period,
self.db_conn().ok().cloned(),
&ip,
None,
proxy_mode,
@ -1074,7 +1067,7 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<RateLimitResult> {
if ip.is_loopback() {
// TODO: localhost being unlimited should be optional
let authorization = Authorization::internal(self.db_conn().ok().cloned())?;
let authorization = Authorization::internal()?;
return Ok(RateLimitResult::Allowed(authorization, None));
}
@ -1085,7 +1078,6 @@ impl Web3ProxyApp {
// they do check origin because we can override rate limits for some origins
let authorization = Authorization::external(
allowed_origin_requests_per_period,
self.db_conn().ok().cloned(),
ip,
origin,
proxy_mode,
@ -1145,7 +1137,7 @@ impl Web3ProxyApp {
let x = self
.rpc_secret_key_cache
.try_get_with_by_ref(rpc_secret_key, async move {
let db_replica = self.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// TODO: join the user table to this to return the User? we don't always need it
// TODO: join on secondary users
@ -1270,7 +1262,7 @@ impl Web3ProxyApp {
let rpc_key_id =
Some(rpc_key_model.id.try_into().context("db ids are never 0")?);
Ok(AuthorizationChecks {
Ok::<_, Web3ProxyError>(AuthorizationChecks {
allowed_ips,
allowed_origins,
allowed_referers,
@ -1307,10 +1299,25 @@ impl Web3ProxyApp {
rpc_key: &RpcSecretKey,
user_agent: Option<&UserAgent>,
) -> Web3ProxyResult<RateLimitResult> {
let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?;
let authorization_checks = match self.authorization_checks(proxy_mode, rpc_key).await {
Ok(x) => x,
Err(err) => {
if let Ok(_err) = err.split_db_errors() {
// // TODO: this is too verbose during an outage. the warnings on the config reloader should be fine
// warn!(
// ?err,
// "db is down. cannot check rpc key. fallback to ip rate limits"
// );
return self.rate_limit_by_ip(ip, origin, proxy_mode).await;
}
return Err(err);
}
};
// if no rpc_key_id matching the given rpc was found, then we can't rate limit by key
if authorization_checks.rpc_secret_key_id.is_none() {
trace!("unknown key. falling back to free limits");
return self.rate_limit_by_ip(ip, origin, proxy_mode).await;
}
@ -1320,7 +1327,6 @@ impl Web3ProxyApp {
let authorization = Authorization::try_new(
authorization_checks,
self.db_conn().ok().cloned(),
ip,
origin,
referer,

@ -259,25 +259,28 @@ pub async fn serve(
.layer(Extension(Arc::new(response_cache)))
// request id
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
// We get the request id from the header
// If no header, a new Ulid is created
// TODO: move this header name to config
let request_id = request
.headers()
.get("x-amzn-trace-id")
.and_then(|x| x.to_str().ok())
.map(ToString::to_string)
.unwrap_or_else(|| Ulid::new().to_string());
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
// We get the request id from the header
// If no header, a new Ulid is created
// TODO: move this header name to config
let request_id = request
.headers()
.get("x-amzn-trace-id")
.and_then(|x| x.to_str().ok())
.map(ToString::to_string)
.unwrap_or_else(|| Ulid::new().to_string());
// And then we put it along with other information into the `request` span
error_span!(
"request",
id = %request_id,
// method = %request.method(),
// path = %request.uri().path(),
)
}),
// And then we put it along with other information into the `request` span
error_span!(
"request",
id = %request_id,
// method = %request.method(),
// path = %request.uri().path(),
)
})
// TODO: on failure that has the request and response body so we can debug more easily
.on_failure(()),
)
// 404 for any unknown routes
.fallback(errors::handler_404);

@ -35,7 +35,7 @@ use std::str::from_utf8_mut;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock};
use tracing::{info, trace};
use tracing::trace;
/// How to select backend servers for a request
#[derive(Copy, Clone, Debug, Default)]
@ -484,7 +484,7 @@ async fn read_web3_socket(
return;
}
Message::Close(_) => {
info!("closing websocket connection");
trace!("closing websocket connection");
// TODO: do something to close subscriptions?
let _ = close_sender.send(true);
return;

@ -2,6 +2,7 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::frontend::authorization::{login_is_authorized, RpcSecretKey};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::user_token::UserBearerToken;
use axum::{
extract::{Path, Query},
@ -124,12 +125,12 @@ pub async fn user_login_get(
resources: vec![],
};
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
// delete any expired logins
if let Err(err) = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(db_conn)
.exec(&db_conn)
.await
{
warn!(?err, "expired_logins");
@ -138,7 +139,7 @@ pub async fn user_login_get(
// delete any expired pending logins
if let Err(err) = pending_login::Entity::delete_many()
.filter(pending_login::Column::ExpiresAt.lte(now))
.exec(db_conn)
.exec(&db_conn)
.await
{
warn!(?err, "expired_pending_logins");
@ -160,7 +161,7 @@ pub async fn user_login_get(
};
user_pending_login
.save(db_conn)
.save(&db_conn)
.await
.web3_context("saving user's pending_login")?;
@ -261,7 +262,7 @@ pub async fn user_login_post(
let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?;
// fetch the message we gave them from our database
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce)))
@ -293,7 +294,7 @@ pub async fn user_login_post(
.one(db_replica.as_ref())
.await?;
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
let (caller, user_rpc_keys, status_code) = match caller {
None => {
@ -384,7 +385,7 @@ pub async fn user_login_post(
// the user is already registered
let user_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(caller.id))
.all(db_conn)
.all(&db_conn)
.await
.web3_context("failed loading user's key")?;
@ -411,11 +412,15 @@ pub async fn user_login_post(
};
user_login
.save(db_conn)
.save(&db_conn)
.await
.web3_context("saving user login")?;
if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await {
if let Err(err) = user_pending_login
.into_active_model()
.delete(&db_conn)
.await
{
error!("Failed to delete nonce:{}: {}", login_nonce, err);
}
@ -438,16 +443,15 @@ pub async fn user_login_post(
/// `POST /user/logout` - Forget the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_logout_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user_bearer = UserBearerToken::try_from(bearer)?;
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
if let Err(err) = login::Entity::delete_many()
.filter(login::Column::BearerToken.eq(user_bearer.uuid()))
.exec(db_conn)
.exec(&db_conn)
.await
{
warn!(key=%user_bearer.redis_key(), ?err, "Failed to delete from redis");

@ -9,6 +9,7 @@ pub mod subuser;
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::global_db_transaction;
use axum::{
headers::{authorization::Bearer, Authorization},
response::IntoResponse,
@ -80,7 +81,7 @@ pub async fn user_post(
}
}
let txn = app.db_transaction().await?;
let txn = global_db_transaction().await?;
// update the referral code IFF they do not already have one set
if let Some(x) = payload.referral_code {

@ -3,6 +3,7 @@ use crate::balance::Balance;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult};
use crate::frontend::authorization::login_is_authorized;
use crate::frontend::users::authentication::register_new_user;
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::premium::{get_user_and_tier_from_address, grant_premium_tier};
use anyhow::Context;
use axum::{
@ -45,7 +46,7 @@ pub async fn user_balance_get(
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let user_balance = match Balance::try_from_db(db_replica.as_ref(), user.id).await? {
None => Balance::default(),
@ -65,7 +66,7 @@ pub async fn user_chain_deposits_get(
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Filter by user ...
let receipts = increase_on_chain_balance_receipt::Entity::find()
@ -104,7 +105,7 @@ pub async fn user_stripe_deposits_get(
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Filter by user ...
let receipts = stripe_increase_balance_receipt::Entity::find()
@ -147,7 +148,7 @@ pub async fn user_admin_deposits_get(
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Filter by user ...
let receipts = admin_increase_balance_receipt::Entity::find()
@ -206,7 +207,7 @@ pub async fn user_balance_post(
Web3ProxyError::BadRequest(format!("unable to parse tx_hash: {}", err).into())
})?;
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
// get the transaction receipt
let transaction_receipt = app
@ -235,7 +236,7 @@ pub async fn user_balance_post(
true
};
let uncle_hashes = find_uncles.all(db_conn).await?;
let uncle_hashes = find_uncles.all(&db_conn).await?;
let uncle_hashes: HashSet<_> = uncle_hashes
.into_iter()
@ -283,7 +284,7 @@ pub async fn user_balance_post(
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex()))
.filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id))
.filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(block_hash.encode_hex()))
.one(db_conn)
.one(&db_conn)
.await?
.is_some()
{
@ -413,7 +414,7 @@ pub async fn user_balance_post(
// invalidate the cache as well
if let Err(err) = app
.user_balance_cache
.invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache)
.invalidate(&recipient.id, &db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, user_id=%recipient.id, "unable to invalidate cache");
@ -484,12 +485,12 @@ pub async fn handle_uncle_block(
// user_id -> balance that we need to subtract
let mut reversed_balances: HashMap<u64, Decimal> = HashMap::new();
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
// delete any deposit txids with uncle_hash
for reversed_deposit in increase_on_chain_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(uncle_hash.encode_hex()))
.all(db_conn)
.all(&db_conn)
.await?
{
let user_id = reversed_deposit.deposit_to_user_id;
@ -499,11 +500,11 @@ pub async fn handle_uncle_block(
*reversed_balance += reversed_deposit.amount;
// TODO: instead of delete, mark as uncled? seems like it would bloat the db unnecessarily. a stat should be enough
reversed_deposit.delete(db_conn).await?;
reversed_deposit.delete(&db_conn).await?;
if let Err(err) = app
.user_balance_cache
.invalidate(&user_id, db_conn, &app.rpc_secret_key_cache)
.invalidate(&user_id, &db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(%user_id, ?err, "unable to invalidate caches");

@ -1,5 +1,6 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::global_db_conn;
use crate::premium::grant_premium_tier;
use anyhow::Context;
use axum::{response::IntoResponse, Extension};
@ -68,13 +69,15 @@ pub async fn user_balance_stripe_post(
return Ok("Received Webhook".into_response());
}
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_conn = global_db_conn()
.await
.web3_context("query_user_stats needs a db")?;
if stripe_increase_balance_receipt::Entity::find()
.filter(
stripe_increase_balance_receipt::Column::StripePaymentIntendId.eq(intent.id.as_str()),
)
.one(db_conn)
.one(&db_conn)
.await?
.is_some()
{
@ -93,7 +96,7 @@ pub async fn user_balance_stripe_post(
))?;
let recipient: Option<user::Model> = user::Entity::find_by_id(recipient_user_id)
.one(db_conn)
.one(&db_conn)
.await?;
// we do a fixed 2 decimal points because we only accept USD for now
@ -121,7 +124,7 @@ pub async fn user_balance_stripe_post(
currency=%intent.currency, %recipient_user_id, %intent.id,
"Please refund this transaction!",
);
let _ = insert_receipt_model.save(db_conn).await;
let _ = insert_receipt_model.save(&db_conn).await;
return Ok("Received Webhook".into_response());
}
@ -145,7 +148,7 @@ pub async fn user_balance_stripe_post(
// Finally invalidate the cache as well
if let Err(err) = app
.user_balance_cache
.invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache)
.invalidate(&recipient.id, &db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");

@ -1,7 +1,8 @@
//! Handle registration, logins, and managing account data.
use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyResponse;
use crate::globals::global_db_conn;
use crate::referral_code::ReferralCode;
use crate::{app::Web3ProxyApp, globals::global_db_replica_conn};
use anyhow::Context;
use axum::{
extract::Query,
@ -35,7 +36,7 @@ pub async fn user_referral_link_get(
// First get the bearer token and check if the user is logged in
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Then get the referral token. If one doesn't exist, create one
let user_referrer = referrer::Entity::find()
@ -47,7 +48,7 @@ pub async fn user_referral_link_get(
Some(x) => (x.referral_code, StatusCode::OK),
None => {
// Connect to the database for writes
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
let referral_code = ReferralCode::default().to_string();
@ -56,7 +57,7 @@ pub async fn user_referral_link_get(
referral_code: sea_orm::ActiveValue::Set(referral_code.clone()),
..Default::default()
};
referrer_entry.save(db_conn).await?;
referrer_entry.save(&db_conn).await?;
(referral_code, StatusCode::CREATED)
}
@ -80,7 +81,7 @@ pub async fn user_used_referral_stats(
// First get the bearer token and check if the user is logged in
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Get all referral records associated with this user
let referrals = referee::Entity::find()
@ -138,7 +139,7 @@ pub async fn user_shared_referral_stats(
// First get the bearer token and check if the user is logged in
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Get all referral records associated with this user
let query_result = referrer::Entity::find()

@ -2,6 +2,7 @@
use super::super::authorization::RpcSecretKey;
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::{global_db_conn, global_db_replica_conn};
use axum::headers::{Header, Origin, Referer, UserAgent};
use axum::{
headers::{authorization::Bearer, Authorization},
@ -31,7 +32,7 @@ pub async fn rpc_keys_get(
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// This is basically completely copied from sea-orm. Not optimal, but it keeps the format identical to before (while adding the final key)
// We could also pack the below stuff into it's subfield, but then we would destroy the format. Both options are fine for now though
@ -160,7 +161,7 @@ pub async fn rpc_keys_management(
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let mut uk = match payload.key_id {
Some(existing_key_id) => {
@ -340,9 +341,9 @@ pub async fn rpc_keys_management(
}
let uk = if uk.is_changed() {
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
uk.save(db_conn)
uk.save(&db_conn)
.await
.web3_context("Failed saving user key")?
} else {

@ -1,6 +1,7 @@
//! Handle registration, logins, and managing account data.
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::global_db_replica_conn;
use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
};
@ -46,7 +47,7 @@ pub async fn user_revert_logs_get(
response.insert("chain_id", json!(chain_id));
response.insert("query_start", json!(query_start.timestamp() as u64));
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))

@ -2,6 +2,7 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::frontend::authorization::RpcSecretKey;
use crate::globals::{global_db_conn, global_db_replica_conn};
use anyhow::Context;
use axum::{
extract::Query,
@ -35,7 +36,7 @@ pub async fn get_keys_as_subuser(
// First, authenticate
let subuser = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// TODO: JOIN over RPC_KEY, SUBUSER, PRIMARY_USER and return these items
@ -100,7 +101,7 @@ pub async fn get_subusers(
// First, authenticate
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
let rpc_key: u64 = params
.remove("key_id")
@ -172,7 +173,7 @@ pub async fn modify_subuser(
// First, authenticate
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
trace!("Parameters are: {:?}", params);
@ -256,7 +257,8 @@ pub async fn modify_subuser(
}
// TODO: There is a good chunk of duplicate logic as login-post. Consider refactoring ...
let db_conn = app.db_conn()?;
let db_conn = global_db_conn().await?;
let (subuser, _subuser_rpc_keys, _status_code) = match subuser {
None => {
// First add a user; the only thing we need from them is an address
@ -332,12 +334,12 @@ pub async fn modify_subuser(
let mut active_subuser_entry_secondary_user = secondary_user.into_active_model();
if !keep_subuser {
// Remove the user
active_subuser_entry_secondary_user.delete(db_conn).await?;
active_subuser_entry_secondary_user.delete(&db_conn).await?;
action = "removed";
} else {
// Just change the role
active_subuser_entry_secondary_user.role = sea_orm::Set(new_role.clone());
active_subuser_entry_secondary_user.save(db_conn).await?;
active_subuser_entry_secondary_user.save(&db_conn).await?;
action = "role modified";
}
}
@ -348,7 +350,7 @@ pub async fn modify_subuser(
role: sea_orm::Set(new_role.clone()),
..Default::default()
};
active_subuser_entry_secondary_user.insert(db_conn).await?;
active_subuser_entry_secondary_user.insert(&db_conn).await?;
action = "added";
}
_ => {

55
web3_proxy/src/globals.rs Normal file

@ -0,0 +1,55 @@
use crate::{errors::Web3ProxyError, relational_db::DatabaseReplica};
use derivative::Derivative;
use migration::{
sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait},
DbErr,
};
use std::sync::{Arc, LazyLock};
use tokio::sync::RwLock as AsyncRwLock;
pub static DB_CONN: LazyLock<AsyncRwLock<Result<DatabaseConnection, DatabaseError>>> =
LazyLock::new(|| AsyncRwLock::new(Err(DatabaseError::NotConfigured)));
pub static DB_REPLICA: LazyLock<AsyncRwLock<Result<DatabaseReplica, DatabaseError>>> =
LazyLock::new(|| AsyncRwLock::new(Err(DatabaseError::NotConfigured)));
#[derive(Clone, Debug, Derivative)]
pub enum DatabaseError {
/// no database configured. depending on what you need, this may or may not be a problem
NotConfigured,
/// an error that happened when creating the connection pool
Connect(Arc<DbErr>),
/// an error that just happened
Begin(Arc<DbErr>),
}
impl From<DatabaseError> for Web3ProxyError {
fn from(value: DatabaseError) -> Self {
match value {
DatabaseError::NotConfigured => Self::NoDatabaseConfigured,
DatabaseError::Connect(err) | DatabaseError::Begin(err) => Self::DatabaseArc(err),
}
}
}
#[inline]
pub async fn global_db_conn() -> Result<DatabaseConnection, DatabaseError> {
DB_CONN.read().await.clone()
}
#[inline]
pub async fn global_db_transaction() -> Result<DatabaseTransaction, DatabaseError> {
let x = global_db_conn().await?;
let x = x
.begin()
.await
.map_err(|x| DatabaseError::Begin(Arc::new(x)))?;
Ok(x)
}
#[inline]
pub async fn global_db_replica_conn() -> Result<DatabaseReplica, DatabaseError> {
DB_REPLICA.read().await.clone()
}

@ -1,3 +1,4 @@
#![feature(lazy_cell)]
#![feature(let_chains)]
#![feature(trait_alias)]
#![forbid(unsafe_code)]
@ -11,6 +12,7 @@ pub mod compute_units;
pub mod config;
pub mod errors;
pub mod frontend;
pub mod globals;
pub mod http_params;
pub mod jsonrpc;
pub mod pagerduty;

@ -34,15 +34,15 @@ pub async fn connect_db(
// TODO: load all these options from the config file. i think docker mysql default max is 100
// Amazon RDS Proxy default idle timeout is 1800 seconds
// TODO: sqlx info logging is way too verbose for production.
db_opt
.acquire_timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.acquire_timeout(Duration::from_secs(3))
.connect_timeout(Duration::from_secs(3))
.idle_timeout(Duration::from_secs(1795))
.min_connections(min_connections)
.max_connections(max_connections)
.sqlx_logging_level(tracing::log::LevelFilter::Trace)
.sqlx_logging(true);
.max_lifetime(Duration::from_secs(60))
.min_connections(min_connections)
.sqlx_logging(true)
.sqlx_logging_level(tracing::log::LevelFilter::Trace);
Database::connect(db_opt).await
}
@ -76,33 +76,23 @@ pub async fn migrate_db(
.col(ColumnDef::new(Alias::new("locked")).boolean().default(true)),
);
loop {
if Migrator::get_pending_migrations(db_conn).await?.is_empty() {
info!("no migrations to apply");
return Ok(());
if Migrator::get_pending_migrations(db_conn).await?.is_empty() {
info!("no migrations to apply");
return Ok(());
}
info!("checking migration lock...");
// there are migrations to apply
// acquire a lock
if db_conn.execute(create_lock_statment).await.is_err() {
if override_existing_lock {
warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!");
sleep(Duration::from_secs(10)).await
} else {
return Err(anyhow::anyhow!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\""));
}
info!("waiting for migration lock...");
// there are migrations to apply
// acquire a lock
if let Err(err) = db_conn.execute(create_lock_statment.clone()).await {
if override_existing_lock {
warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!");
sleep(Duration::from_secs(10)).await
} else {
debug!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\". err={:?}", err);
// TODO: exponential backoff with jitter?
sleep(Duration::from_secs(1)).await;
continue;
}
}
debug!("migration lock acquired");
break;
}
info!("migrating...");

@ -179,7 +179,6 @@ impl Web3Rpcs {
return None;
}
let db_conn = app.db_conn().ok().cloned();
let http_client = app.http_client.clone();
let vredis_pool = app.vredis_pool.clone();
@ -197,7 +196,6 @@ impl Web3Rpcs {
let handle = tokio::spawn(server_config.spawn(
server_name,
db_conn,
vredis_pool,
chain_id,
block_interval,
@ -372,11 +370,11 @@ impl Web3Rpcs {
}
if let Err(e) = try_join_all(futures).await {
error!("subscriptions over: {:?}", self);
error!(?self, "subscriptions over");
return Err(e);
}
info!("subscriptions over: {:?}", self);
info!(?self, "subscriptions over");
Ok(())
}

@ -92,7 +92,6 @@ impl Web3Rpc {
config: Web3RpcConfig,
name: String,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
// optional because this is only used for http providers. websocket-only providers don't use it
http_client: Option<reqwest::Client>,
redis_pool: Option<RedisPool>,
@ -183,7 +182,6 @@ impl Web3Rpc {
block_data_limit,
block_interval,
created_at: Some(created_at),
db_conn,
display_name: config.display_name,
hard_limit,
hard_limit_until: Some(hard_limit_until),
@ -469,7 +467,7 @@ impl Web3Rpc {
let age = self.created_at.unwrap().elapsed().as_millis();
debug!("clearing head block on {} ({}ms old)!", self, age);
trace!("clearing head block on {} ({}ms old)!", self, age);
// send an empty block to take this server out of rotation
head_block_sender.send_replace(None);
@ -676,7 +674,7 @@ impl Web3Rpc {
disconnect_watch_rx.changed().await?;
}
info!("disconnect triggered on {}", rpc);
trace!("disconnect triggered on {}", rpc);
Ok(())
};
@ -723,7 +721,7 @@ impl Web3Rpc {
sleep(Duration::from_secs(health_sleep_seconds)).await;
}
debug!("healthcheck loop on {} exited", rpc);
trace!("healthcheck loop on {} exited", rpc);
Ok(())
};
@ -747,6 +745,7 @@ impl Web3Rpc {
}
// exit if any of the futures exit
// TODO: have an enum for which one exited?
let first_exit = futures.next().await;
debug!(?first_exit, "subscriptions on {} exited", self);
@ -858,7 +857,7 @@ impl Web3Rpc {
.await?;
if *subscribe_stop_rx.borrow() {
debug!(%self, "new heads subscription exited");
trace!(%self, "new heads subscription exited");
Ok(())
} else {
Err(anyhow!("new_heads subscription exited. reconnect needed").into())

@ -1,8 +1,8 @@
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use anyhow::Context;
use chrono::Utc;
use derive_more::From;
use entities::revert_log;
@ -90,7 +90,7 @@ impl Authorization {
}
};
let db_conn = self.db_conn.as_ref().context("no database connection")?;
let db_conn = global_db_conn().await?;
// TODO: should the database set the timestamp?
// we intentionally use "now" and not the time the request started
@ -111,7 +111,7 @@ impl Authorization {
};
let rl = rl
.save(db_conn)
.save(&db_conn)
.await
.web3_context("Failed saving new revert log")?;
@ -225,7 +225,7 @@ impl OpenRequestHandle {
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
} else if self.authorization.db_conn.is_some() {
} else if DB_CONN.read().await.is_ok() {
let log_revert_chance = self.authorization.checks.log_revert_chance;
if log_revert_chance == 0 {
@ -378,6 +378,7 @@ impl OpenRequestHandle {
match serde_json::from_value::<EthCallParams>(json!(params)) {
Ok(params) => {
// spawn saving to the database so we don't slow down the request
// TODO: log if this errors
let f = self.authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);

@ -1,6 +1,7 @@
use super::StatType;
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
get_query_window_seconds_from_params, get_user_id_from_params,
@ -61,13 +62,13 @@ pub async fn query_user_stats<'a>(
params: &'a HashMap<String, String>,
stat_response_type: StatType,
) -> Web3ProxyResponse {
let db_conn = app.db_conn()?;
let db_replica = app.db_replica()?;
let db_conn = global_db_conn().await?;
let db_replica = global_db_replica_conn().await?;
let mut redis_conn = app.redis_conn().await?;
// get the user id first. if it is 0, we should use a cache on the app
let user_id =
get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?;
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
// get the query window seconds now so that we can pick a cache with a good TTL
// TODO: for now though, just do one cache. its easier
let query_window_seconds = get_query_window_seconds_from_params(params)?;

@ -1,5 +1,6 @@
use super::StatType;
use crate::errors::Web3ProxyErrorContext;
use crate::globals::global_db_replica_conn;
use crate::{
app::Web3ProxyApp,
errors::{Web3ProxyError, Web3ProxyResponse},
@ -47,7 +48,7 @@ pub async fn query_user_stats<'a>(
));
}
let db_replica = app.db_replica()?;
let db_replica = global_db_replica_conn().await?;
// Read the (optional) user-id from the request, this is the logic for subusers
// If there is no bearer token, this is not allowed

@ -527,7 +527,7 @@ impl RpcQueryStats {
let mut authorization = metadata.authorization.take();
if authorization.is_none() {
authorization = Some(Arc::new(Authorization::internal(None)?));
authorization = Some(Arc::new(Authorization::internal()?));
}
let authorization = authorization.expect("Authorization will always be set");

@ -2,13 +2,13 @@ use super::{AppStat, FlushedStats, RpcQueryKey};
use crate::app::Web3ProxyJoinHandle;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::errors::Web3ProxyResult;
use crate::globals::global_db_conn;
use crate::stats::RpcQueryStats;
use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
use influxdb2::api::write::TimestampPrecision;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{interval, sleep};
@ -45,7 +45,6 @@ pub struct StatBuffer {
accounting_db_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
billing_period_seconds: i64,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
db_save_interval_seconds: u32,
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
influxdb_bucket: Option<String>,
@ -64,7 +63,6 @@ impl StatBuffer {
pub fn try_spawn(
billing_period_seconds: i64,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
db_save_interval_seconds: u32,
influxdb_bucket: Option<String>,
mut influxdb_client: Option<influxdb2::Client>,
@ -79,10 +77,6 @@ impl StatBuffer {
influxdb_client = None;
}
if db_conn.is_none() && influxdb_client.is_none() {
return Ok(None);
}
let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
let timestamp_precision = TimestampPrecision::Seconds;
@ -91,7 +85,6 @@ impl StatBuffer {
accounting_db_buffer: Default::default(),
billing_period_seconds,
chain_id,
db_conn,
db_save_interval_seconds,
global_timeseries_buffer: Default::default(),
influxdb_bucket,
@ -142,51 +135,50 @@ impl StatBuffer {
let mut approximate_balance_remaining = 0.into();
// TODO: re-enable this once I know its not the cause of Polygon W3P crashing all the time
if false {
if let Some(db_conn) = self.db_conn.as_ref() {
let user_id = stat.authorization.checks.user_id;
// TODO: we want to do this even if the db is down. we need to buffer if there is an outage!
if let Ok(db_conn) = global_db_conn().await {
let user_id = stat.authorization.checks.user_id;
// update the user's balance
if user_id != 0 {
// update the user's cached balance
let mut user_balance = stat.authorization.checks.latest_balance.write().await;
// update the user's balance
if user_id != 0 {
// update the user's cached balance
let mut user_balance = stat.authorization.checks.latest_balance.write().await;
// TODO: move this to a helper function
user_balance.total_frontend_requests += 1;
user_balance.total_spent += stat.compute_unit_cost;
// TODO: move this to a helper function
user_balance.total_frontend_requests += 1;
user_balance.total_spent += stat.compute_unit_cost;
if !stat.backend_rpcs_used.is_empty() {
user_balance.total_cache_misses += 1;
}
// if paid_credits_used is true, then they were premium at the start of the request
if stat.authorization.checks.paid_credits_used {
// TODO: this lets them get a negative remaining balance. we should clear if close to 0
user_balance.total_spent_paid_credits += stat.compute_unit_cost;
// check if they still have premium
if user_balance.active_premium() {
// TODO: referall credits here? i think in the save_db section still makes sense for those
} else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
} else if user_balance.active_premium() {
// paid credits were not used, but now we have active premium. invalidate the caches
// TODO: this seems unliekly. should we warn if this happens so we can investigate?
if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
}
approximate_balance_remaining = user_balance.remaining();
if !stat.backend_rpcs_used.is_empty() {
user_balance.total_cache_misses += 1;
}
// self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat.clone(), approximate_balance_remaining).await;
// if paid_credits_used is true, then they were premium at the start of the request
if stat.authorization.checks.paid_credits_used {
// TODO: this lets them get a negative remaining balance. we should clear if close to 0
user_balance.total_spent_paid_credits += stat.compute_unit_cost;
// check if they still have premium
if user_balance.active_premium() {
// TODO: referall credits here? i think in the save_db section still makes sense for those
} else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
} else if user_balance.active_premium() {
// paid credits were not used, but now we have active premium. invalidate the caches
// TODO: this seems unliekly. should we warn if this happens so we can investigate?
if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
}
approximate_balance_remaining = user_balance.remaining();
}
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat.clone(), approximate_balance_remaining).await;
}
if self.influxdb_client.is_some() {
@ -259,8 +251,8 @@ impl StatBuffer {
// TODO: wait on all websockets to close
// TODO: wait on all pending external requests to finish
info!("waiting 10 seconds for remaining stats to arrive");
sleep(Duration::from_secs(10)).await;
info!("waiting 5 seconds for remaining stats to arrive");
sleep(Duration::from_secs(5)).await;
// loop {
// // nope. this won't ever be true because we keep making stats for internal requests
@ -289,7 +281,7 @@ impl StatBuffer {
async fn save_relational_stats(&mut self) -> usize {
let mut count = 0;
if let Some(db_conn) = self.db_conn.as_ref() {
if let Ok(db_conn) = global_db_conn().await {
count = self.accounting_db_buffer.len();
for (key, stat) in self.accounting_db_buffer.drain() {
// TODO: batch saves
@ -297,7 +289,7 @@ impl StatBuffer {
if let Err(err) = stat
.save_db(
self.chain_id,
db_conn,
&db_conn,
key,
&self.user_balance_cache,
&self.rpc_secret_key_cache,

@ -78,7 +78,6 @@ impl MigrateStatsToV2SubCommand {
let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
top_config.app.chain_id,
Some(db_conn.clone()),
30,
top_config.app.influxdb_bucket.clone(),
influxdb_client.clone(),
@ -117,8 +116,8 @@ impl MigrateStatsToV2SubCommand {
// (2) Create request metadata objects to match the old data
// Iterate through all old rows, and put them into the above objects.
for x in old_records.iter() {
let mut authorization = Authorization::internal(None)
.context("failed creating internal authorization")?;
let mut authorization =
Authorization::internal().context("failed creating internal authorization")?;
match x.rpc_key_id {
Some(rpc_key_id) => {

@ -1,6 +1,7 @@
use crate::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use crate::compute_units::default_usd_per_cu;
use crate::config::TopConfig;
use crate::globals::global_db_conn;
use crate::stats::FlushedStats;
use crate::{frontend, prometheus};
use argh::FromArgs;
@ -148,6 +149,10 @@ impl ProxydSubCommand {
prometheus_shutdown_receiver,
));
if spawned_app.app.config.db_url.is_some() {
// give 30 seconds for the db to connect. if it does not connect, it will keep retrying
}
info!("waiting for head block");
let max_wait_until = Instant::now() + Duration::from_secs(35);
loop {
@ -286,7 +291,8 @@ impl ProxydSubCommand {
}
}
if let Ok(db_conn) = spawned_app.app.db_conn().cloned() {
// TODO: make sure this happens even if we exit with an error
if let Ok(db_conn) = global_db_conn().await {
/*
From the sqlx docs:

@ -52,7 +52,7 @@ impl TransferKeySubCommand {
uk.user_id = sea_orm::Set(new_u.id);
let _uk = uk.save(db_conn).await?;
uk.save(db_conn).await?;
info!("changed the key's owner");
}

@ -4,20 +4,20 @@ use ethers::{
types::Address,
};
use hashbrown::HashMap;
use parking_lot::Mutex;
use serde_json::json;
use std::{
env,
str::FromStr,
sync::atomic::{AtomicU16, Ordering},
thread,
};
use std::{sync::Arc, time::Duration};
use tokio::{
runtime::Builder,
sync::{
broadcast::{self, error::SendError},
mpsc, oneshot,
},
task::JoinHandle,
time::{sleep, Instant},
};
use tracing::info;
@ -28,8 +28,9 @@ use web3_proxy::{
};
pub struct TestApp {
/// spawn handle for the proxy.
pub proxy_handle: Mutex<Option<JoinHandle<anyhow::Result<()>>>>,
/// **THREAD** (not async) handle for the proxy.
/// In an Option so we can take it and not break the `impl Drop`
pub proxy_handle: Option<thread::JoinHandle<anyhow::Result<()>>>,
/// connection to the proxy that is connected to anil.
pub proxy_provider: Provider<Http>,
@ -96,16 +97,29 @@ impl TestApp {
// spawn the app
// TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop
let handle = {
tokio::spawn(ProxydSubCommand::_main(
top_config,
None,
frontend_port_arc.clone(),
prometheus_port_arc,
num_workers,
shutdown_sender.clone(),
flush_stat_buffer_sender.clone(),
flush_stat_buffer_receiver,
))
let frontend_port_arc = frontend_port_arc.clone();
let prometheus_port_arc = prometheus_port_arc.clone();
let flush_stat_buffer_sender = flush_stat_buffer_sender.clone();
let shutdown_sender = shutdown_sender.clone();
thread::spawn(move || {
let runtime = Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.build()
.unwrap();
runtime.block_on(ProxydSubCommand::_main(
top_config,
None,
frontend_port_arc,
prometheus_port_arc,
num_workers,
shutdown_sender,
flush_stat_buffer_sender,
flush_stat_buffer_receiver,
))
})
};
let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed);
@ -125,7 +139,7 @@ impl TestApp {
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
Self {
proxy_handle: Mutex::new(Some(handle)),
proxy_handle: Some(handle),
proxy_provider,
flush_stat_buffer_sender,
shutdown_sender,
@ -148,15 +162,11 @@ impl TestApp {
}
#[allow(unused)]
pub async fn wait(&self) {
pub fn wait_for_stop(mut self) {
let _ = self.stop();
// TODO: lock+take feels weird, but it works
let handle = self.proxy_handle.lock().take();
if let Some(handle) = handle {
info!("waiting for the app to stop...");
handle.await.unwrap().unwrap();
if let Some(handle) = self.proxy_handle.take() {
handle.join().unwrap();
}
}
}

@ -1,6 +1,7 @@
use crate::TestApp;
use ethers::prelude::{LocalWallet, Signer};
use ethers::types::Signature;
use http::StatusCode;
use tracing::info;
use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin};
use web3_proxy::sub_commands::ChangeAdminStatusSubCommand;
@ -80,17 +81,18 @@ pub async fn create_user_as_admin(
admin_wallet.address()
);
let admin_login_message = r.get(admin_login_get_url).send().await.unwrap();
let admin_login_message = admin_login_message.text().await.unwrap();
assert_eq!(admin_login_message.status(), StatusCode::OK);
let admin_login_text = admin_login_message.text().await.unwrap();
info!(?admin_login_text);
// Sign the message and POST it to login as admin
let admin_signed: Signature = admin_wallet
.sign_message(&admin_login_message)
.await
.unwrap();
let admin_signed: Signature = admin_wallet.sign_message(&admin_login_text).await.unwrap();
info!(?admin_signed);
let admin_post_login_data = PostLogin {
msg: admin_login_message,
msg: admin_login_text,
sig: admin_signed.to_string(),
referral_code: None,
};

@ -2,11 +2,12 @@ use crate::TestApp;
use entities::{user, user_tier};
use ethers::prelude::{LocalWallet, Signer};
use ethers::types::Signature;
use http::StatusCode;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter,
};
use tracing::info;
use tracing::{info, trace};
use web3_proxy::errors::Web3ProxyResult;
use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin};
@ -38,15 +39,20 @@ pub async fn create_user(
};
info!(?user_post_login_data);
let mut user_login_response = r
let user_login_response = r
.post(&login_post_url)
.json(&user_post_login_data)
.send()
.await
.unwrap()
.json::<LoginPostResponse>()
.await
.unwrap();
trace!(?user_login_response);
assert_eq!(user_login_response.status(), StatusCode::OK);
let user_login_text = user_login_response.text().await.unwrap();
trace!("user_login_text: {:#}", user_login_text);
let user_login_response: LoginPostResponse = serde_json::from_str(&user_login_text).unwrap();
info!(?user_login_response);
user_login_response

@ -8,7 +8,7 @@ use tokio::{
time::{sleep, Instant},
};
use tracing::{info, trace, warn};
use web3_proxy::relational_db::get_migrated_db;
use web3_proxy::relational_db::{connect_db, get_migrated_db};
/// on drop, the mysql docker container will be shut down
pub struct TestMysql {
@ -166,9 +166,7 @@ impl TestMysql {
}
pub async fn conn(&self) -> DatabaseConnection {
get_migrated_db(self.url.clone().unwrap(), 1, 5)
.await
.unwrap()
connect_db(self.url.clone().unwrap(), 1, 5).await.unwrap()
}
}

@ -62,7 +62,7 @@ async fn test_admin_grant_credits() {
let user_balance = user_get_balance(&x, &r, &user_login_response).await;
assert_eq!(user_balance.remaining(), Decimal::from(100));
x.wait().await;
x.wait_for_stop();
}
// #[cfg_attr(not(feature = "tests-needing-docker"), ignore)]

@ -97,8 +97,10 @@ async fn it_starts_and_stops() {
assert_eq!(anvil_result, proxy_result.unwrap());
// this won't do anything since stats aren't tracked when there isn't a db
x.flush_stats().await.unwrap_err();
let flushed = x.flush_stats().await.unwrap();
assert_eq!(flushed.relational, 0);
assert_eq!(flushed.timeseries, 0);
// most tests won't need to wait, but we should wait here to be sure all the shutdown logic works properly
x.wait().await;
x.wait_for_stop();
}