less verbose db_conn and db_replica helpers

This commit is contained in:
Bryan Stitt 2023-06-24 11:11:07 -07:00
parent 9115419ec0
commit fef03f089f
17 changed files with 158 additions and 202 deletions

@ -6,7 +6,7 @@ use tokio::time::{Duration, Instant};
pub use deadpool_redis::redis;
pub use deadpool_redis::{
Config as RedisConfig, Connection as RedisConnection, Manager as RedisManager,
Pool as RedisPool, Runtime as DeadpoolRuntime,
Pool as RedisPool, PoolError as RedisPoolError, Runtime as DeadpoolRuntime,
};
#[derive(Clone)]

@ -1,7 +1,6 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::http_params::get_user_id_from_params;
use anyhow::Context;
use axum::response::IntoResponse;
use axum::{
headers::{authorization::Bearer, Authorization},
@ -43,17 +42,9 @@ pub async fn query_admin_modify_usertier<'a>(
let mut response_body = HashMap::new();
// Establish connections
let db_conn = app
.db_conn()
.context("query_admin_modify_user needs a db")?;
let db_replica = app
.db_replica()
.context("query_user_stats needs a db replica")?;
let mut redis_conn = app
.redis_conn()
.await
.context("query_admin_modify_user had a redis connection error")?
.context("query_admin_modify_user needs a redis")?;
let db_conn = app.db_conn()?;
let db_replica = app.db_replica()?;
let mut redis_conn = app.redis_conn().await?;
// Will modify logic here
@ -61,14 +52,14 @@ pub async fn query_admin_modify_usertier<'a>(
// TODO: Make a single query, where you retrieve the user, and directly from it the secondary user (otherwise we do two jumpy, which is unnecessary)
// get the user id first. if it is 0, we should use a cache on the app
let caller_id =
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?;
trace!("Caller id is: {:?}", caller_id);
// Check if the caller is an admin (i.e. if he is in an admin table)
let _admin: admin::Model = admin::Entity::find()
.filter(admin::Column::UserId.eq(caller_id))
.one(&db_conn)
.one(db_conn)
.await?
.ok_or(Web3ProxyError::AccessDenied)?;
@ -77,7 +68,7 @@ pub async fn query_admin_modify_usertier<'a>(
// Fetch the admin, and the user
let user: user::Model = user::Entity::find()
.filter(user::Column::Address.eq(user_address.as_bytes()))
.one(&db_conn)
.one(db_conn)
.await?
.ok_or(Web3ProxyError::BadRequest(
"No user with this id found".into(),
@ -91,7 +82,7 @@ pub async fn query_admin_modify_usertier<'a>(
// Now we can modify the user's tier
let new_user_tier: user_tier::Model = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq(user_tier_title.clone()))
.one(&db_conn)
.one(db_conn)
.await?
.ok_or(Web3ProxyError::BadRequest(
"User Tier name was not found".into(),
@ -104,7 +95,7 @@ pub async fn query_admin_modify_usertier<'a>(
user.user_tier_id = sea_orm::Set(new_user_tier.id);
user.save(&db_conn).await?;
user.save(db_conn).await?;
info!("user's tier changed");
}
@ -112,7 +103,7 @@ pub async fn query_admin_modify_usertier<'a>(
// Now delete all bearer tokens of this user
login::Entity::delete_many()
.filter(login::Column::UserId.eq(user.id))
.exec(&db_conn)
.exec(db_conn)
.await?;
Ok(Json(&response_body).into_response())

@ -515,7 +515,7 @@ impl Web3ProxyApp {
Some(watch_consensus_head_sender),
)
.await
.context("spawning balanced rpcs")?;
.web3_context("spawning balanced rpcs")?;
app_handles.push(balanced_handle);
@ -546,7 +546,7 @@ impl Web3ProxyApp {
None,
)
.await
.context("spawning private_rpcs")?;
.web3_context("spawning private_rpcs")?;
app_handles.push(private_handle);
@ -573,7 +573,7 @@ impl Web3ProxyApp {
None,
)
.await
.context("spawning bundler_4337_rpcs")?;
.web3_context("spawning bundler_4337_rpcs")?;
app_handles.push(bundler_4337_rpcs_handle);
@ -646,7 +646,7 @@ impl Web3ProxyApp {
new_top_config_receiver
.changed()
.await
.context("failed awaiting top_config change")?;
.web3_context("failed awaiting top_config change")?;
}
});
@ -682,14 +682,14 @@ impl Web3ProxyApp {
self.balanced_rpcs
.apply_server_configs(self, new_top_config.balanced_rpcs)
.await
.context("updating balanced rpcs")?;
.web3_context("updating balanced rpcs")?;
if let Some(private_rpc_configs) = new_top_config.private_rpcs {
if let Some(ref private_rpcs) = self.private_rpcs {
private_rpcs
.apply_server_configs(self, private_rpc_configs)
.await
.context("updating private_rpcs")?;
.web3_context("updating private_rpcs")?;
} else {
// TODO: maybe we should have private_rpcs just be empty instead of being None
todo!("handle toggling private_rpcs")
@ -701,7 +701,7 @@ impl Web3ProxyApp {
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs)
.await
.context("updating bundler_4337_rpcs")?;
.web3_context("updating bundler_4337_rpcs")?;
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo!("handle toggling bundler_4337_rpcs")
@ -731,8 +731,8 @@ impl Web3ProxyApp {
#[derive(Default, Serialize)]
struct UserCount(i64);
let user_count: UserCount = if let Some(db) = self.db_conn() {
match user::Entity::find().count(&db).await {
let user_count: UserCount = if let Ok(db) = self.db_conn() {
match user::Entity::find().count(db).await {
Ok(user_count) => UserCount(user_count as i64),
Err(err) => {
warn!("unable to count users: {:?}", err);
@ -767,7 +767,7 @@ impl Web3ProxyApp {
RecentCounts,
RecentCounts,
) = match self.redis_conn().await {
Ok(Some(mut redis_conn)) => {
Ok(mut redis_conn) => {
// TODO: delete any hash entries where
const ONE_MINUTE: i64 = 60;
const ONE_HOUR: i64 = ONE_MINUTE * 60;
@ -857,11 +857,6 @@ impl Web3ProxyApp {
}
}
}
Ok(None) => (
RecentCounts::default(),
RecentCounts::default(),
RecentCounts::default(),
),
Err(err) => {
warn!("unable to connect to redis while counting users: {:?}", err);
(
@ -898,7 +893,7 @@ impl Web3ProxyApp {
method: &str,
params: P,
) -> Web3ProxyResult<R> {
let db_conn = self.db_conn();
let db_conn = self.db_conn().ok().cloned();
let authorization = Arc::new(Authorization::internal(db_conn)?);
@ -1021,10 +1016,9 @@ impl Web3ProxyApp {
Ok((collected, collected_rpcs))
}
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
#[inline]
pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone()
pub fn db_conn(&self) -> Web3ProxyResult<&DatabaseConnection> {
self.db_conn.as_ref().ok_or(Web3ProxyError::NoDatabase)
}
#[inline]
@ -1038,18 +1032,18 @@ impl Web3ProxyApp {
}
#[inline]
pub fn db_replica(&self) -> Option<DatabaseReplica> {
self.db_replica.clone()
pub fn db_replica(&self) -> Web3ProxyResult<&DatabaseReplica> {
self.db_replica.as_ref().ok_or(Web3ProxyError::NoDatabase)
}
pub async fn redis_conn(&self) -> anyhow::Result<Option<redis_rate_limiter::RedisConnection>> {
pub async fn redis_conn(&self) -> Web3ProxyResult<redis_rate_limiter::RedisConnection> {
match self.vredis_pool.as_ref() {
// TODO: don't do an error. return None
None => Ok(None),
None => Err(Web3ProxyError::NoDatabase),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;
// TODO: add a From for this
let redis_conn = redis_pool.get().await.context("redis pool error")?;
Ok(Some(redis_conn))
Ok(redis_conn)
}
}
}
@ -1458,7 +1452,7 @@ impl Web3ProxyApp {
let f = async move {
match app.redis_conn().await {
Ok(Some(mut redis_conn)) => {
Ok(mut redis_conn) => {
let hashed_tx_hash =
Bytes::from(keccak256(salted_tx_hash.as_bytes()));
@ -1469,7 +1463,7 @@ impl Web3ProxyApp {
.zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now)
.await?;
}
Ok(None) => {}
Err(Web3ProxyError::NoDatabase) => {},
Err(err) => {
warn!(
"unable to save stats for eth_sendRawTransaction: {:?}",

@ -125,7 +125,7 @@ impl UserImportSubCommand {
for import_u in us.into_iter() {
// first, check if a user already exists with this address
if let Some(existing_u) = user::Entity::find()
.filter(user::Column::Address.eq(import_u.address))
.filter(user::Column::Address.eq(import_u.address.clone()))
.one(db_conn)
.await?
{

@ -14,9 +14,11 @@ use axum::{
use derive_more::{Display, Error, From};
use ethers::prelude::ContractError;
use http::header::InvalidHeaderValue;
use http::uri::InvalidUri;
use ipnet::AddrParseError;
use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use redis_rate_limiter::RedisPoolError;
use reqwest::header::ToStrError;
use rust_decimal::Error as DecimalError;
use serde::Serialize;
@ -63,6 +65,7 @@ pub enum Web3ProxyError {
HdrRecord(hdrhistogram::errors::RecordError),
Headers(headers::Error),
HeaderToString(ToStrError),
HttpUri(InvalidUri),
Hyper(hyper::Error),
InfluxDb2Request(influxdb2::RequestError),
#[display(fmt = "{} > {}", min, max)]
@ -112,6 +115,7 @@ pub enum Web3ProxyError {
},
NotFound,
NotImplemented,
NoVolatileRedisDatabase,
OriginRequired,
#[error(ignore)]
#[from(ignore)]
@ -124,6 +128,7 @@ pub enum Web3ProxyError {
#[display(fmt = "{:?}, {:?}", _0, _1)]
RateLimited(Authorization, Option<Instant>),
Redis(RedisError),
RedisDeadpool(RedisPoolError),
RefererRequired,
#[display(fmt = "{:?}", _0)]
#[error(ignore)]
@ -355,6 +360,17 @@ impl Web3ProxyError {
},
)
}
Self::HttpUri(err) => {
trace!("HttpUri {:#?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
message: err.to_string().into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
},
)
}
Self::Hyper(err) => {
warn!("hyper err={:#?}", err);
(
@ -461,6 +477,18 @@ impl Web3ProxyError {
},
)
}
Self::RedisDeadpool(err) => {
error!("redis deadpool err={:#?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose our io error strings?
message: err.to_string().into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::UnknownReferralCode => {
trace!("UnknownReferralCode");
(
@ -616,6 +644,17 @@ impl Web3ProxyError {
},
)
}
Self::NoVolatileRedisDatabase => {
error!("no volatile redis database configured");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "no volatile redis database configured!".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::NoServersSynced => {
warn!("NoServersSynced");
(

@ -175,17 +175,7 @@ pub async fn admin_login_get(
let issued_at = OffsetDateTime::now_utc();
let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0));
// The admin user is the one that basically logs in, on behalf of the user
// This will generate a login id for the admin, which we will be caching ...
// I suppose with this, the admin can be logged in to one session at a time
// let (caller, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
// Finally, check if the user is an admin. If he is, return "true" as the third triplet.
// TODO: consider wrapping the output in a struct, instead of a triplet
// TODO: Could try to merge this into the above query ...
// This query will fail if it's not the admin...
// get the admin field ...
// get the admin's address
let admin_address: Address = params
.get("admin_address")
.ok_or_else(|| {
@ -196,7 +186,7 @@ pub async fn admin_login_get(
Web3ProxyError::BadRequest("Unable to parse admin_address as an Address".into())
})?;
// Fetch the user_address parameter from the login string ... (as who we want to be logging in ...)
// get the address who we want to be logging in as
let user_address: Address = params
.get("user_address")
.ok_or_else(|| {
@ -208,29 +198,26 @@ pub async fn admin_login_get(
})?;
// We want to login to llamanodes.com
let login_domain = app
let domain = app
.config
.login_domain
.as_deref()
.unwrap_or("llamanodes.com");
// Also there must basically be a token, that says that one admin logins _as a user_.
// I'm not yet fully sure how to handle with that logic specifically ...
let message_domain = domain.parse()?;
// TODO: don't unwrap
let message_uri = format!("https://{}/", domain).parse().unwrap();
// TODO: get most of these from the app config
// TODO: Let's check again who the message needs to be signed by;
// if the message does not have to be signed by the user, include the user ...
let message = Message {
// TODO: don't unwrap
// TODO: accept a login_domain from the request?
domain: login_domain.parse().unwrap(),
domain: message_domain,
// the admin needs to sign the message, not the imitated user
address: admin_address.to_fixed_bytes(),
// TODO: config for statement
statement: Some("🦙🦙🦙🦙🦙".to_string()),
// TODO: don't unwrap
uri: format!("https://{}/", login_domain).parse().unwrap(),
statement: Some("👑👑👑👑👑".to_string()),
uri: message_uri,
version: siwe::Version::V1,
chain_id: 1,
chain_id: app.config.chain_id,
expiration_time: Some(expiration_time.into()),
issued_at: issued_at.into(),
nonce: nonce.to_string(),
@ -239,16 +226,14 @@ pub async fn admin_login_get(
resources: vec![],
};
let db_conn = app.db_conn().web3_context("login requires a database")?;
let db_replica = app
.db_replica()
.web3_context("login requires a replica database")?;
let db_conn = app.db_conn()?;
let db_replica = app.db_replica()?;
// delete ALL expired rows.
let now = Utc::now();
let delete_result = pending_login::Entity::delete_many()
.filter(pending_login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await?;
// TODO: emit a stat? if this is high something weird might be happening
@ -283,7 +268,7 @@ pub async fn admin_login_get(
..Default::default()
};
trail
.save(&db_conn)
.save(db_conn)
.await
.web3_context("saving user's pending_login")?;
@ -308,7 +293,7 @@ pub async fn admin_login_get(
};
user_pending_login
.save(&db_conn)
.save(db_conn)
.await
.web3_context("saving an admin trail pre login")?;
@ -385,9 +370,7 @@ pub async fn admin_login_post(
})?;
// fetch the message we gave them from our database
let db_replica = app
.db_replica()
.web3_context("Getting database connection")?;
let db_replica = app.db_replica()?;
// massage type for the db
let login_nonce_uuid: Uuid = login_nonce.clone().into();
@ -434,9 +417,7 @@ pub async fn admin_login_post(
.await?
.web3_context("admin address was not found!")?;
let db_conn = app
.db_conn()
.web3_context("deleting expired pending logins requires a db")?;
let db_conn = app.db_conn()?;
// Add a message that the admin has logged in
// Note that the admin is trying to log in as this user
@ -448,7 +429,7 @@ pub async fn admin_login_post(
..Default::default()
};
trail
.save(&db_conn)
.save(db_conn)
.await
.web3_context("saving an admin trail post login")?;
@ -497,15 +478,11 @@ pub async fn admin_login_post(
};
user_login
.save(&db_conn)
.save(db_conn)
.await
.web3_context("saving user login")?;
if let Err(err) = user_pending_login
.into_active_model()
.delete(&db_conn)
.await
{
if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await {
warn!("Failed to delete nonce:{}: {}", login_nonce.0, err);
}
@ -521,13 +498,11 @@ pub async fn admin_logout_post(
) -> Web3ProxyResponse {
let user_bearer = UserBearerToken::try_from(bearer)?;
let db_conn = app
.db_conn()
.web3_context("database needed for user logout")?;
let db_conn = app.db_conn()?;
if let Err(err) = login::Entity::delete_many()
.filter(login::Column::BearerToken.eq(user_bearer.uuid()))
.exec(&db_conn)
.exec(db_conn)
.await
{
debug!("Failed to delete {}: {}", user_bearer.redis_key(), err);
@ -538,7 +513,7 @@ pub async fn admin_logout_post(
// also delete any expired logins
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await;
debug!("Deleted expired logins: {:?}", delete_result);
@ -546,7 +521,7 @@ pub async fn admin_logout_post(
// also delete any expired pending logins
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await;
debug!("Deleted expired pending logins: {:?}", delete_result);

@ -859,7 +859,7 @@ pub async fn ip_is_authorized(
let f = async move {
let now = Utc::now().timestamp();
if let Some(mut redis_conn) = app.redis_conn().await? {
if let Ok(mut redis_conn) = app.redis_conn().await {
let salt = app
.config
.public_recent_ips_salt
@ -923,7 +923,7 @@ pub async fn key_is_authorized(
let f = async move {
let now = Utc::now().timestamp();
if let Some(mut redis_conn) = app.redis_conn().await? {
if let Ok(mut redis_conn) = app.redis_conn().await {
let salt = app
.config
.public_recent_ips_salt
@ -1027,9 +1027,7 @@ impl Web3ProxyApp {
let semaphore_permit = semaphore.acquire_owned().await?;
// get the attached address from the database for the given auth_token.
let db_replica = self
.db_replica()
.web3_context("checking if bearer token is authorized")?;
let db_replica = self.db_replica()?;
let user_bearer_uuid: Uuid = user_bearer_token.into();
@ -1054,7 +1052,7 @@ impl Web3ProxyApp {
// we don't care about user agent or origin or referer
let authorization = Authorization::external(
&self.config.allowed_origin_requests_per_period,
self.db_conn(),
self.db_conn().ok().cloned(),
&ip,
None,
proxy_mode,
@ -1110,7 +1108,7 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<RateLimitResult> {
if ip.is_loopback() {
// TODO: localhost being unlimited should be optional
let authorization = Authorization::internal(self.db_conn())?;
let authorization = Authorization::internal(self.db_conn().ok().cloned())?;
return Ok(RateLimitResult::Allowed(authorization, None));
}
@ -1119,7 +1117,7 @@ impl Web3ProxyApp {
// they do check origin because we can override rate limits for some origins
let authorization = Authorization::external(
allowed_origin_requests_per_period,
self.db_conn(),
self.db_conn().ok().cloned(),
ip,
origin,
proxy_mode,
@ -1181,9 +1179,7 @@ impl Web3ProxyApp {
Ok(x) => self
.user_balance_cache
.try_get_with(x, async move {
let db_replica = self
.db_replica()
.web3_context("Getting database replica connection")?;
let db_replica = self.db_replica()?;
loop {
match balance::Entity::find()
@ -1201,8 +1197,7 @@ impl Web3ProxyApp {
}
None => {
// no balance row. make one now
let db_conn =
self.db_conn().web3_context("Getting database connection")?;
let db_conn = self.db_conn()?;
let balance_entry = balance::ActiveModel {
id: sea_orm::NotSet,
@ -1219,7 +1214,7 @@ impl Web3ProxyApp {
)])
.to_owned(),
)
.exec(&db_conn)
.exec(db_conn)
.await
.web3_context("creating empty balance row for existing user")?;
@ -1243,9 +1238,7 @@ impl Web3ProxyApp {
.try_get_with_by_ref(rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss");
let db_replica = self
.db_replica()
.web3_context("Getting database connection")?;
let db_replica = self.db_replica()?;
// TODO: join the user table to this to return the User? we don't always need it
// TODO: join on secondary users
@ -1322,7 +1315,7 @@ impl Web3ProxyApp {
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(db_replica.as_ref())
.await?
.context(
.web3_context(
"user model was not found, but every rpc_key should have a user",
)?;
@ -1331,7 +1324,7 @@ impl Web3ProxyApp {
)
.one(db_replica.as_ref())
.await?
.context(
.web3_context(
"related user tier not found, but every user should have a tier",
)?;
@ -1351,7 +1344,7 @@ impl Web3ProxyApp {
user_tier::Entity::find_by_id(downgrade_user_tier)
.one(db_replica.as_ref())
.await?
.context(format!(
.web3_context(format!(
"downgrade user tier ({}) is missing!",
downgrade_user_tier
))?;
@ -1411,7 +1404,7 @@ impl Web3ProxyApp {
let authorization = Authorization::try_new(
authorization_checks,
self.db_conn(),
self.db_conn().ok().cloned(),
ip,
origin,
referer,

@ -74,24 +74,24 @@ pub async fn user_login_get(
.parse()
.or(Err(Web3ProxyError::ParseAddressError))?;
let login_domain = app
let domain = app
.config
.login_domain
.clone()
.unwrap_or_else(|| "llamanodes.com".to_string());
let message_domain = domain.parse().unwrap();
let message_uri = format!("https://{}/", domain).parse().unwrap();
// TODO: get most of these from the app config
let message = Message {
// TODO: don't unwrap
// TODO: accept a login_domain from the request?
domain: login_domain.parse().unwrap(),
domain: message_domain,
address: user_address.to_fixed_bytes(),
// TODO: config for statement
statement: Some("🦙🦙🦙🦙🦙".to_string()),
// TODO: don't unwrap
uri: format!("https://{}/", login_domain).parse().unwrap(),
uri: message_uri,
version: siwe::Version::V1,
chain_id: 1,
chain_id: app.config.chain_id,
expiration_time: Some(expiration_time.into()),
issued_at: issued_at.into(),
nonce: nonce.to_string(),
@ -100,13 +100,13 @@ pub async fn user_login_get(
resources: vec![],
};
let db_conn = app.db_conn().web3_context("login requires a database")?;
let db_conn = app.db_conn()?;
// delete ALL expired rows.
let now = Utc::now();
let _ = pending_login::Entity::delete_many()
.filter(pending_login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await?;
// massage types to fit in the database. sea-orm does not make this very elegant
@ -127,7 +127,7 @@ pub async fn user_login_get(
};
user_pending_login
.save(&db_conn)
.save(db_conn)
.await
.web3_context("saving user's pending_login")?;
@ -236,9 +236,7 @@ pub async fn user_login_post(
let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?;
// fetch the message we gave them from our database
let db_replica = app
.db_replica()
.web3_context("Getting database connection")?;
let db_replica = app.db_replica()?;
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce.clone())))
@ -270,7 +268,7 @@ pub async fn user_login_post(
.one(db_replica.as_ref())
.await?;
let db_conn = app.db_conn().web3_context("login requires a db")?;
let db_conn = app.db_conn()?;
let (caller, user_rpc_keys, status_code) = match caller {
None => {
@ -357,7 +355,7 @@ pub async fn user_login_post(
// the user is already registered
let user_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(caller.id))
.all(&db_conn)
.all(db_conn)
.await
.web3_context("failed loading user's key")?;
@ -397,15 +395,11 @@ pub async fn user_login_post(
};
user_login
.save(&db_conn)
.save(db_conn)
.await
.web3_context("saving user login")?;
if let Err(err) = user_pending_login
.into_active_model()
.delete(&db_conn)
.await
{
if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await {
error!("Failed to delete nonce:{}: {}", login_nonce, err);
}
@ -420,13 +414,11 @@ pub async fn user_logout_post(
) -> Web3ProxyResponse {
let user_bearer = UserBearerToken::try_from(bearer)?;
let db_conn = app
.db_conn()
.web3_context("database needed for user logout")?;
let db_conn = app.db_conn()?;
if let Err(err) = login::Entity::delete_many()
.filter(login::Column::BearerToken.eq(user_bearer.uuid()))
.exec(&db_conn)
.exec(db_conn)
.await
{
warn!("Failed to delete {}: {}", user_bearer.redis_key(), err);
@ -437,7 +429,7 @@ pub async fn user_logout_post(
// also delete any expired logins
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await;
trace!("Deleted expired logins: {:?}", delete_result);
@ -445,7 +437,7 @@ pub async fn user_logout_post(
// also delete any expired pending logins
let delete_result = login::Entity::delete_many()
.filter(login::Column::ExpiresAt.lte(now))
.exec(&db_conn)
.exec(db_conn)
.await;
trace!("Deleted expired pending logins: {:?}", delete_result);

@ -45,7 +45,7 @@ pub async fn user_balance_get(
) -> Web3ProxyResponse {
let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica().context("Getting database connection")?;
let db_replica = app.db_replica()?;
// Just return the balance for the user
let user_balance = balance::Entity::find()
@ -73,7 +73,7 @@ pub async fn user_deposits_get(
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica().context("Getting database connection")?;
let db_replica = app.db_replica()?;
// Filter by user ...
let receipts = increase_on_chain_balance_receipt::Entity::find()
@ -114,7 +114,7 @@ pub async fn user_balance_post(
let (_, semaphore) = app.bearer_is_authorized(bearer).await?;
// TODO: is handling this as internal fine?
let authorization = Web3ProxyAuthorization::internal(app.db_conn())?;
let authorization = Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())?;
(authorization, Some(semaphore))
} else if let Some(InsecureClientIp(ip)) = ip {
@ -136,7 +136,7 @@ pub async fn user_balance_post(
Web3ProxyError::BadRequest(format!("unable to parse tx_hash: {}", err).into())
})?;
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_conn = app.db_conn()?;
let authorization = Arc::new(authorization);
@ -172,7 +172,7 @@ pub async fn user_balance_post(
true
};
let uncle_hashes = find_uncles.all(&db_conn).await?;
let uncle_hashes = find_uncles.all(db_conn).await?;
let uncle_hashes: HashSet<_> = uncle_hashes
.into_iter()

@ -35,9 +35,7 @@ pub async fn user_referral_link_get(
// First get the bearer token and check if the user is logged in
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
// Then get the referral token. If one doesn't exist, create one
let user_referrer = referrer::Entity::find()
@ -49,7 +47,7 @@ pub async fn user_referral_link_get(
Some(x) => (x.referral_code, StatusCode::OK),
None => {
// Connect to the database for writes
let db_conn = app.db_conn().context("getting db_conn")?;
let db_conn = app.db_conn()?;
let referral_code = ReferralCode::default().to_string();
@ -58,7 +56,7 @@ pub async fn user_referral_link_get(
referral_code: sea_orm::ActiveValue::Set(referral_code.clone()),
..Default::default()
};
referrer_entry.save(&db_conn).await?;
referrer_entry.save(db_conn).await?;
(referral_code, StatusCode::CREATED)
}
@ -82,9 +80,7 @@ pub async fn user_used_referral_stats(
// First get the bearer token and check if the user is logged in
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
// Get all referral records associated with this user
let referrals = referee::Entity::find()
@ -142,9 +138,7 @@ pub async fn user_shared_referral_stats(
// First get the bearer token and check if the user is logged in
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
// Get all referral records associated with this user
let query_result = referrer::Entity::find()

@ -31,9 +31,7 @@ pub async fn rpc_keys_get(
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.web3_context("db_replica is required to fetch a user's keys")?;
let db_replica = app.db_replica()?;
// This is basically completely copied from sea-orm. Not optimal, but it keeps the format identical to before (while adding the final key)
// We could also pack the below stuff into it's subfield, but then we would destroy the format. Both options are fine for now though
@ -162,9 +160,7 @@ pub async fn rpc_keys_management(
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.web3_context("getting db for user's keys")?;
let db_replica = app.db_replica()?;
let mut uk = match payload.key_id {
Some(existing_key_id) => {
@ -342,9 +338,9 @@ pub async fn rpc_keys_management(
}
let uk = if uk.is_changed() {
let db_conn = app.db_conn().web3_context("login requires a db")?;
let db_conn = app.db_conn()?;
uk.save(&db_conn)
uk.save(db_conn)
.await
.web3_context("Failed saving user key")?
} else {

@ -46,9 +46,7 @@ pub async fn user_revert_logs_get(
response.insert("chain_id", json!(chain_id));
response.insert("query_start", json!(query_start.timestamp() as u64));
let db_replica = app
.db_replica()
.web3_context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))

@ -35,9 +35,7 @@ pub async fn get_keys_as_subuser(
// First, authenticate
let (subuser, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
// TODO: JOIN over RPC_KEY, SUBUSER, PRIMARY_USER and return these items
@ -102,9 +100,7 @@ pub async fn get_subusers(
// First, authenticate
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
let rpc_key: u64 = params
.remove("key_id")
@ -176,9 +172,7 @@ pub async fn modify_subuser(
// First, authenticate
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
.db_replica()
.context("getting replica db for user's revert logs")?;
let db_replica = app.db_replica()?;
trace!("Parameters are: {:?}", params);
@ -262,7 +256,7 @@ pub async fn modify_subuser(
}
// TODO: There is a good chunk of duplicate logic as login-post. Consider refactoring ...
let db_conn = app.db_conn().web3_context("login requires a db")?;
let db_conn = app.db_conn()?;
let (subuser, _subuser_rpc_keys, _status_code) = match subuser {
None => {
let txn = db_conn.begin().await?;
@ -344,12 +338,12 @@ pub async fn modify_subuser(
let mut active_subuser_entry_secondary_user = secondary_user.into_active_model();
if !keep_subuser {
// Remove the user
active_subuser_entry_secondary_user.delete(&db_conn).await?;
active_subuser_entry_secondary_user.delete(db_conn).await?;
action = "removed";
} else {
// Just change the role
active_subuser_entry_secondary_user.role = sea_orm::Set(new_role.clone());
active_subuser_entry_secondary_user.save(&db_conn).await?;
active_subuser_entry_secondary_user.save(db_conn).await?;
action = "role modified";
}
}

@ -43,8 +43,7 @@ pub async fn get_user_id_from_params(
let user_login = login::Entity::find()
.filter(login::Column::BearerToken.eq(user_bearer_token.uuid()))
.one(db_replica.as_ref())
.await
.context("database error while querying for user")?
.await?
.ok_or(Web3ProxyError::AccessDenied)?;
// if expired, delete ALL expired logins

@ -194,7 +194,7 @@ impl Web3Rpcs {
return None;
}
let db_conn = app.db_conn();
let db_conn = app.db_conn().ok().cloned();
let http_client = app.http_client.clone();
let vredis_pool = app.vredis_pool.clone();

@ -5,7 +5,6 @@ use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
get_query_window_seconds_from_params, get_user_id_from_params,
};
use anyhow::Context;
use axum::response::IntoResponse;
use axum::Json;
use axum::{
@ -62,19 +61,13 @@ pub async fn query_user_stats<'a>(
params: &'a HashMap<String, String>,
stat_response_type: StatType,
) -> Web3ProxyResponse {
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_replica = app
.db_replica()
.context("query_user_stats needs a db replica")?;
let mut redis_conn = app
.redis_conn()
.await
.context("query_user_stats had a redis connection error")?
.context("query_user_stats needs a redis")?;
let db_conn = app.db_conn()?;
let db_replica = app.db_replica()?;
let mut redis_conn = app.redis_conn().await?;
// get the user id first. if it is 0, we should use a cache on the app
let user_id =
get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?;
get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?;
// get the query window seconds now so that we can pick a cache with a good TTL
// TODO: for now though, just do one cache. its easier
let query_window_seconds = get_query_window_seconds_from_params(params)?;

@ -46,9 +46,7 @@ pub async fn query_user_stats<'a>(
));
}
let db_replica = app
.db_replica()
.context("query_user_stats needs a db replica")?;
let db_replica = app.db_replica()?;
// TODO: have a getter for this. do we need a connection pool on it?
let influxdb_client = app