From fef03f089f6c4de01bd21ee753b901bd9ff7403e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 24 Jun 2023 11:11:07 -0700 Subject: [PATCH] less verbose db_conn and db_replica helpers --- redis-rate-limiter/src/lib.rs | 2 +- web3_proxy/src/admin_queries.rs | 27 +++---- web3_proxy/src/app/mod.rs | 50 ++++++------- .../src/bin/web3_proxy_cli/user_import.rs | 2 +- web3_proxy/src/errors.rs | 39 ++++++++++ web3_proxy/src/frontend/admin.rs | 75 +++++++------------ web3_proxy/src/frontend/authorization.rs | 35 ++++----- .../src/frontend/users/authentication.rs | 46 +++++------- web3_proxy/src/frontend/users/payment.rs | 10 +-- web3_proxy/src/frontend/users/referral.rs | 16 ++-- web3_proxy/src/frontend/users/rpc_keys.rs | 12 +-- web3_proxy/src/frontend/users/stats.rs | 4 +- web3_proxy/src/frontend/users/subuser.rs | 18 ++--- web3_proxy/src/http_params.rs | 3 +- web3_proxy/src/rpcs/many.rs | 2 +- web3_proxy/src/stats/db_queries.rs | 15 +--- web3_proxy/src/stats/influxdb_queries.rs | 4 +- 17 files changed, 158 insertions(+), 202 deletions(-) diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index 551584ad..5fdfc9e2 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -6,7 +6,7 @@ use tokio::time::{Duration, Instant}; pub use deadpool_redis::redis; pub use deadpool_redis::{ Config as RedisConfig, Connection as RedisConnection, Manager as RedisManager, - Pool as RedisPool, Runtime as DeadpoolRuntime, + Pool as RedisPool, PoolError as RedisPoolError, Runtime as DeadpoolRuntime, }; #[derive(Clone)] diff --git a/web3_proxy/src/admin_queries.rs b/web3_proxy/src/admin_queries.rs index e720a76b..a3b956be 100644 --- a/web3_proxy/src/admin_queries.rs +++ b/web3_proxy/src/admin_queries.rs @@ -1,7 +1,6 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::http_params::get_user_id_from_params; -use anyhow::Context; use axum::response::IntoResponse; use axum::{ headers::{authorization::Bearer, Authorization}, @@ -43,17 +42,9 @@ pub async fn query_admin_modify_usertier<'a>( let mut response_body = HashMap::new(); // Establish connections - let db_conn = app - .db_conn() - .context("query_admin_modify_user needs a db")?; - let db_replica = app - .db_replica() - .context("query_user_stats needs a db replica")?; - let mut redis_conn = app - .redis_conn() - .await - .context("query_admin_modify_user had a redis connection error")? - .context("query_admin_modify_user needs a redis")?; + let db_conn = app.db_conn()?; + let db_replica = app.db_replica()?; + let mut redis_conn = app.redis_conn().await?; // Will modify logic here @@ -61,14 +52,14 @@ pub async fn query_admin_modify_usertier<'a>( // TODO: Make a single query, where you retrieve the user, and directly from it the secondary user (otherwise we do two jumpy, which is unnecessary) // get the user id first. if it is 0, we should use a cache on the app let caller_id = - get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; + get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?; trace!("Caller id is: {:?}", caller_id); // Check if the caller is an admin (i.e. if he is in an admin table) let _admin: admin::Model = admin::Entity::find() .filter(admin::Column::UserId.eq(caller_id)) - .one(&db_conn) + .one(db_conn) .await? .ok_or(Web3ProxyError::AccessDenied)?; @@ -77,7 +68,7 @@ pub async fn query_admin_modify_usertier<'a>( // Fetch the admin, and the user let user: user::Model = user::Entity::find() .filter(user::Column::Address.eq(user_address.as_bytes())) - .one(&db_conn) + .one(db_conn) .await? .ok_or(Web3ProxyError::BadRequest( "No user with this id found".into(), @@ -91,7 +82,7 @@ pub async fn query_admin_modify_usertier<'a>( // Now we can modify the user's tier let new_user_tier: user_tier::Model = user_tier::Entity::find() .filter(user_tier::Column::Title.eq(user_tier_title.clone())) - .one(&db_conn) + .one(db_conn) .await? .ok_or(Web3ProxyError::BadRequest( "User Tier name was not found".into(), @@ -104,7 +95,7 @@ pub async fn query_admin_modify_usertier<'a>( user.user_tier_id = sea_orm::Set(new_user_tier.id); - user.save(&db_conn).await?; + user.save(db_conn).await?; info!("user's tier changed"); } @@ -112,7 +103,7 @@ pub async fn query_admin_modify_usertier<'a>( // Now delete all bearer tokens of this user login::Entity::delete_many() .filter(login::Column::UserId.eq(user.id)) - .exec(&db_conn) + .exec(db_conn) .await?; Ok(Json(&response_body).into_response()) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 54280661..2e10d9e5 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -515,7 +515,7 @@ impl Web3ProxyApp { Some(watch_consensus_head_sender), ) .await - .context("spawning balanced rpcs")?; + .web3_context("spawning balanced rpcs")?; app_handles.push(balanced_handle); @@ -546,7 +546,7 @@ impl Web3ProxyApp { None, ) .await - .context("spawning private_rpcs")?; + .web3_context("spawning private_rpcs")?; app_handles.push(private_handle); @@ -573,7 +573,7 @@ impl Web3ProxyApp { None, ) .await - .context("spawning bundler_4337_rpcs")?; + .web3_context("spawning bundler_4337_rpcs")?; app_handles.push(bundler_4337_rpcs_handle); @@ -646,7 +646,7 @@ impl Web3ProxyApp { new_top_config_receiver .changed() .await - .context("failed awaiting top_config change")?; + .web3_context("failed awaiting top_config change")?; } }); @@ -682,14 +682,14 @@ impl Web3ProxyApp { self.balanced_rpcs .apply_server_configs(self, new_top_config.balanced_rpcs) .await - .context("updating balanced rpcs")?; + .web3_context("updating balanced rpcs")?; if let Some(private_rpc_configs) = new_top_config.private_rpcs { if let Some(ref private_rpcs) = self.private_rpcs { private_rpcs .apply_server_configs(self, private_rpc_configs) .await - .context("updating private_rpcs")?; + .web3_context("updating private_rpcs")?; } else { // TODO: maybe we should have private_rpcs just be empty instead of being None todo!("handle toggling private_rpcs") @@ -701,7 +701,7 @@ impl Web3ProxyApp { bundler_4337_rpcs .apply_server_configs(self, bundler_4337_rpc_configs) .await - .context("updating bundler_4337_rpcs")?; + .web3_context("updating bundler_4337_rpcs")?; } else { // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None todo!("handle toggling bundler_4337_rpcs") @@ -731,8 +731,8 @@ impl Web3ProxyApp { #[derive(Default, Serialize)] struct UserCount(i64); - let user_count: UserCount = if let Some(db) = self.db_conn() { - match user::Entity::find().count(&db).await { + let user_count: UserCount = if let Ok(db) = self.db_conn() { + match user::Entity::find().count(db).await { Ok(user_count) => UserCount(user_count as i64), Err(err) => { warn!("unable to count users: {:?}", err); @@ -767,7 +767,7 @@ impl Web3ProxyApp { RecentCounts, RecentCounts, ) = match self.redis_conn().await { - Ok(Some(mut redis_conn)) => { + Ok(mut redis_conn) => { // TODO: delete any hash entries where const ONE_MINUTE: i64 = 60; const ONE_HOUR: i64 = ONE_MINUTE * 60; @@ -857,11 +857,6 @@ impl Web3ProxyApp { } } } - Ok(None) => ( - RecentCounts::default(), - RecentCounts::default(), - RecentCounts::default(), - ), Err(err) => { warn!("unable to connect to redis while counting users: {:?}", err); ( @@ -898,7 +893,7 @@ impl Web3ProxyApp { method: &str, params: P, ) -> Web3ProxyResult { - let db_conn = self.db_conn(); + let db_conn = self.db_conn().ok().cloned(); let authorization = Arc::new(Authorization::internal(db_conn)?); @@ -1021,10 +1016,9 @@ impl Web3ProxyApp { Ok((collected, collected_rpcs)) } - /// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref() #[inline] - pub fn db_conn(&self) -> Option { - self.db_conn.clone() + pub fn db_conn(&self) -> Web3ProxyResult<&DatabaseConnection> { + self.db_conn.as_ref().ok_or(Web3ProxyError::NoDatabase) } #[inline] @@ -1038,18 +1032,18 @@ impl Web3ProxyApp { } #[inline] - pub fn db_replica(&self) -> Option { - self.db_replica.clone() + pub fn db_replica(&self) -> Web3ProxyResult<&DatabaseReplica> { + self.db_replica.as_ref().ok_or(Web3ProxyError::NoDatabase) } - pub async fn redis_conn(&self) -> anyhow::Result> { + pub async fn redis_conn(&self) -> Web3ProxyResult { match self.vredis_pool.as_ref() { - // TODO: don't do an error. return None - None => Ok(None), + None => Err(Web3ProxyError::NoDatabase), Some(redis_pool) => { - let redis_conn = redis_pool.get().await?; + // TODO: add a From for this + let redis_conn = redis_pool.get().await.context("redis pool error")?; - Ok(Some(redis_conn)) + Ok(redis_conn) } } } @@ -1458,7 +1452,7 @@ impl Web3ProxyApp { let f = async move { match app.redis_conn().await { - Ok(Some(mut redis_conn)) => { + Ok(mut redis_conn) => { let hashed_tx_hash = Bytes::from(keccak256(salted_tx_hash.as_bytes())); @@ -1469,7 +1463,7 @@ impl Web3ProxyApp { .zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now) .await?; } - Ok(None) => {} + Err(Web3ProxyError::NoDatabase) => {}, Err(err) => { warn!( "unable to save stats for eth_sendRawTransaction: {:?}", diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_import.rs b/web3_proxy/src/bin/web3_proxy_cli/user_import.rs index 59835ac2..482d981e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/user_import.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/user_import.rs @@ -125,7 +125,7 @@ impl UserImportSubCommand { for import_u in us.into_iter() { // first, check if a user already exists with this address if let Some(existing_u) = user::Entity::find() - .filter(user::Column::Address.eq(import_u.address)) + .filter(user::Column::Address.eq(import_u.address.clone())) .one(db_conn) .await? { diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index bb4a910d..76f11cd9 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -14,9 +14,11 @@ use axum::{ use derive_more::{Display, Error, From}; use ethers::prelude::ContractError; use http::header::InvalidHeaderValue; +use http::uri::InvalidUri; use ipnet::AddrParseError; use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; +use redis_rate_limiter::RedisPoolError; use reqwest::header::ToStrError; use rust_decimal::Error as DecimalError; use serde::Serialize; @@ -63,6 +65,7 @@ pub enum Web3ProxyError { HdrRecord(hdrhistogram::errors::RecordError), Headers(headers::Error), HeaderToString(ToStrError), + HttpUri(InvalidUri), Hyper(hyper::Error), InfluxDb2Request(influxdb2::RequestError), #[display(fmt = "{} > {}", min, max)] @@ -112,6 +115,7 @@ pub enum Web3ProxyError { }, NotFound, NotImplemented, + NoVolatileRedisDatabase, OriginRequired, #[error(ignore)] #[from(ignore)] @@ -124,6 +128,7 @@ pub enum Web3ProxyError { #[display(fmt = "{:?}, {:?}", _0, _1)] RateLimited(Authorization, Option), Redis(RedisError), + RedisDeadpool(RedisPoolError), RefererRequired, #[display(fmt = "{:?}", _0)] #[error(ignore)] @@ -355,6 +360,17 @@ impl Web3ProxyError { }, ) } + Self::HttpUri(err) => { + trace!("HttpUri {:#?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcErrorData { + message: err.to_string().into(), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, + ) + } Self::Hyper(err) => { warn!("hyper err={:#?}", err); ( @@ -461,6 +477,18 @@ impl Web3ProxyError { }, ) } + Self::RedisDeadpool(err) => { + error!("redis deadpool err={:#?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + // TODO: is it safe to expose our io error strings? + message: err.to_string().into(), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::UnknownReferralCode => { trace!("UnknownReferralCode"); ( @@ -616,6 +644,17 @@ impl Web3ProxyError { }, ) } + Self::NoVolatileRedisDatabase => { + error!("no volatile redis database configured"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: "no volatile redis database configured!".into(), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::NoServersSynced => { warn!("NoServersSynced"); ( diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 98cd3c28..a99c1b11 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -175,17 +175,7 @@ pub async fn admin_login_get( let issued_at = OffsetDateTime::now_utc(); let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0)); - // The admin user is the one that basically logs in, on behalf of the user - // This will generate a login id for the admin, which we will be caching ... - // I suppose with this, the admin can be logged in to one session at a time - // let (caller, _semaphore) = app.bearer_is_authorized(bearer_token).await?; - - // Finally, check if the user is an admin. If he is, return "true" as the third triplet. - // TODO: consider wrapping the output in a struct, instead of a triplet - // TODO: Could try to merge this into the above query ... - // This query will fail if it's not the admin... - - // get the admin field ... + // get the admin's address let admin_address: Address = params .get("admin_address") .ok_or_else(|| { @@ -196,7 +186,7 @@ pub async fn admin_login_get( Web3ProxyError::BadRequest("Unable to parse admin_address as an Address".into()) })?; - // Fetch the user_address parameter from the login string ... (as who we want to be logging in ...) + // get the address who we want to be logging in as let user_address: Address = params .get("user_address") .ok_or_else(|| { @@ -208,29 +198,26 @@ pub async fn admin_login_get( })?; // We want to login to llamanodes.com - let login_domain = app + let domain = app .config .login_domain .as_deref() .unwrap_or("llamanodes.com"); - // Also there must basically be a token, that says that one admin logins _as a user_. - // I'm not yet fully sure how to handle with that logic specifically ... + let message_domain = domain.parse()?; + // TODO: don't unwrap + let message_uri = format!("https://{}/", domain).parse().unwrap(); + // TODO: get most of these from the app config - // TODO: Let's check again who the message needs to be signed by; - // if the message does not have to be signed by the user, include the user ... let message = Message { - // TODO: don't unwrap - // TODO: accept a login_domain from the request? - domain: login_domain.parse().unwrap(), + domain: message_domain, // the admin needs to sign the message, not the imitated user address: admin_address.to_fixed_bytes(), // TODO: config for statement - statement: Some("🦙🦙🦙🦙🦙".to_string()), - // TODO: don't unwrap - uri: format!("https://{}/", login_domain).parse().unwrap(), + statement: Some("👑👑👑👑👑".to_string()), + uri: message_uri, version: siwe::Version::V1, - chain_id: 1, + chain_id: app.config.chain_id, expiration_time: Some(expiration_time.into()), issued_at: issued_at.into(), nonce: nonce.to_string(), @@ -239,16 +226,14 @@ pub async fn admin_login_get( resources: vec![], }; - let db_conn = app.db_conn().web3_context("login requires a database")?; - let db_replica = app - .db_replica() - .web3_context("login requires a replica database")?; + let db_conn = app.db_conn()?; + let db_replica = app.db_replica()?; // delete ALL expired rows. let now = Utc::now(); let delete_result = pending_login::Entity::delete_many() .filter(pending_login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await?; // TODO: emit a stat? if this is high something weird might be happening @@ -283,7 +268,7 @@ pub async fn admin_login_get( ..Default::default() }; trail - .save(&db_conn) + .save(db_conn) .await .web3_context("saving user's pending_login")?; @@ -308,7 +293,7 @@ pub async fn admin_login_get( }; user_pending_login - .save(&db_conn) + .save(db_conn) .await .web3_context("saving an admin trail pre login")?; @@ -385,9 +370,7 @@ pub async fn admin_login_post( })?; // fetch the message we gave them from our database - let db_replica = app - .db_replica() - .web3_context("Getting database connection")?; + let db_replica = app.db_replica()?; // massage type for the db let login_nonce_uuid: Uuid = login_nonce.clone().into(); @@ -434,9 +417,7 @@ pub async fn admin_login_post( .await? .web3_context("admin address was not found!")?; - let db_conn = app - .db_conn() - .web3_context("deleting expired pending logins requires a db")?; + let db_conn = app.db_conn()?; // Add a message that the admin has logged in // Note that the admin is trying to log in as this user @@ -448,7 +429,7 @@ pub async fn admin_login_post( ..Default::default() }; trail - .save(&db_conn) + .save(db_conn) .await .web3_context("saving an admin trail post login")?; @@ -497,15 +478,11 @@ pub async fn admin_login_post( }; user_login - .save(&db_conn) + .save(db_conn) .await .web3_context("saving user login")?; - if let Err(err) = user_pending_login - .into_active_model() - .delete(&db_conn) - .await - { + if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await { warn!("Failed to delete nonce:{}: {}", login_nonce.0, err); } @@ -521,13 +498,11 @@ pub async fn admin_logout_post( ) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app - .db_conn() - .web3_context("database needed for user logout")?; + let db_conn = app.db_conn()?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) - .exec(&db_conn) + .exec(db_conn) .await { debug!("Failed to delete {}: {}", user_bearer.redis_key(), err); @@ -538,7 +513,7 @@ pub async fn admin_logout_post( // also delete any expired logins let delete_result = login::Entity::delete_many() .filter(login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await; debug!("Deleted expired logins: {:?}", delete_result); @@ -546,7 +521,7 @@ pub async fn admin_logout_post( // also delete any expired pending logins let delete_result = login::Entity::delete_many() .filter(login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await; debug!("Deleted expired pending logins: {:?}", delete_result); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 99b63806..0226670f 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -859,7 +859,7 @@ pub async fn ip_is_authorized( let f = async move { let now = Utc::now().timestamp(); - if let Some(mut redis_conn) = app.redis_conn().await? { + if let Ok(mut redis_conn) = app.redis_conn().await { let salt = app .config .public_recent_ips_salt @@ -923,7 +923,7 @@ pub async fn key_is_authorized( let f = async move { let now = Utc::now().timestamp(); - if let Some(mut redis_conn) = app.redis_conn().await? { + if let Ok(mut redis_conn) = app.redis_conn().await { let salt = app .config .public_recent_ips_salt @@ -1027,9 +1027,7 @@ impl Web3ProxyApp { let semaphore_permit = semaphore.acquire_owned().await?; // get the attached address from the database for the given auth_token. - let db_replica = self - .db_replica() - .web3_context("checking if bearer token is authorized")?; + let db_replica = self.db_replica()?; let user_bearer_uuid: Uuid = user_bearer_token.into(); @@ -1054,7 +1052,7 @@ impl Web3ProxyApp { // we don't care about user agent or origin or referer let authorization = Authorization::external( &self.config.allowed_origin_requests_per_period, - self.db_conn(), + self.db_conn().ok().cloned(), &ip, None, proxy_mode, @@ -1110,7 +1108,7 @@ impl Web3ProxyApp { ) -> Web3ProxyResult { if ip.is_loopback() { // TODO: localhost being unlimited should be optional - let authorization = Authorization::internal(self.db_conn())?; + let authorization = Authorization::internal(self.db_conn().ok().cloned())?; return Ok(RateLimitResult::Allowed(authorization, None)); } @@ -1119,7 +1117,7 @@ impl Web3ProxyApp { // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( allowed_origin_requests_per_period, - self.db_conn(), + self.db_conn().ok().cloned(), ip, origin, proxy_mode, @@ -1181,9 +1179,7 @@ impl Web3ProxyApp { Ok(x) => self .user_balance_cache .try_get_with(x, async move { - let db_replica = self - .db_replica() - .web3_context("Getting database replica connection")?; + let db_replica = self.db_replica()?; loop { match balance::Entity::find() @@ -1201,8 +1197,7 @@ impl Web3ProxyApp { } None => { // no balance row. make one now - let db_conn = - self.db_conn().web3_context("Getting database connection")?; + let db_conn = self.db_conn()?; let balance_entry = balance::ActiveModel { id: sea_orm::NotSet, @@ -1219,7 +1214,7 @@ impl Web3ProxyApp { )]) .to_owned(), ) - .exec(&db_conn) + .exec(db_conn) .await .web3_context("creating empty balance row for existing user")?; @@ -1243,9 +1238,7 @@ impl Web3ProxyApp { .try_get_with_by_ref(rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); - let db_replica = self - .db_replica() - .web3_context("Getting database connection")?; + let db_replica = self.db_replica()?; // TODO: join the user table to this to return the User? we don't always need it // TODO: join on secondary users @@ -1322,7 +1315,7 @@ impl Web3ProxyApp { let user_model = user::Entity::find_by_id(rpc_key_model.user_id) .one(db_replica.as_ref()) .await? - .context( + .web3_context( "user model was not found, but every rpc_key should have a user", )?; @@ -1331,7 +1324,7 @@ impl Web3ProxyApp { ) .one(db_replica.as_ref()) .await? - .context( + .web3_context( "related user tier not found, but every user should have a tier", )?; @@ -1351,7 +1344,7 @@ impl Web3ProxyApp { user_tier::Entity::find_by_id(downgrade_user_tier) .one(db_replica.as_ref()) .await? - .context(format!( + .web3_context(format!( "downgrade user tier ({}) is missing!", downgrade_user_tier ))?; @@ -1411,7 +1404,7 @@ impl Web3ProxyApp { let authorization = Authorization::try_new( authorization_checks, - self.db_conn(), + self.db_conn().ok().cloned(), ip, origin, referer, diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index bd4319d8..c76e5416 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -74,24 +74,24 @@ pub async fn user_login_get( .parse() .or(Err(Web3ProxyError::ParseAddressError))?; - let login_domain = app + let domain = app .config .login_domain .clone() .unwrap_or_else(|| "llamanodes.com".to_string()); + let message_domain = domain.parse().unwrap(); + let message_uri = format!("https://{}/", domain).parse().unwrap(); + // TODO: get most of these from the app config let message = Message { - // TODO: don't unwrap - // TODO: accept a login_domain from the request? - domain: login_domain.parse().unwrap(), + domain: message_domain, address: user_address.to_fixed_bytes(), // TODO: config for statement statement: Some("🦙🦙🦙🦙🦙".to_string()), - // TODO: don't unwrap - uri: format!("https://{}/", login_domain).parse().unwrap(), + uri: message_uri, version: siwe::Version::V1, - chain_id: 1, + chain_id: app.config.chain_id, expiration_time: Some(expiration_time.into()), issued_at: issued_at.into(), nonce: nonce.to_string(), @@ -100,13 +100,13 @@ pub async fn user_login_get( resources: vec![], }; - let db_conn = app.db_conn().web3_context("login requires a database")?; + let db_conn = app.db_conn()?; // delete ALL expired rows. let now = Utc::now(); let _ = pending_login::Entity::delete_many() .filter(pending_login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await?; // massage types to fit in the database. sea-orm does not make this very elegant @@ -127,7 +127,7 @@ pub async fn user_login_get( }; user_pending_login - .save(&db_conn) + .save(db_conn) .await .web3_context("saving user's pending_login")?; @@ -236,9 +236,7 @@ pub async fn user_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_replica = app - .db_replica() - .web3_context("Getting database connection")?; + let db_replica = app.db_replica()?; let user_pending_login = pending_login::Entity::find() .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce.clone()))) @@ -270,7 +268,7 @@ pub async fn user_login_post( .one(db_replica.as_ref()) .await?; - let db_conn = app.db_conn().web3_context("login requires a db")?; + let db_conn = app.db_conn()?; let (caller, user_rpc_keys, status_code) = match caller { None => { @@ -357,7 +355,7 @@ pub async fn user_login_post( // the user is already registered let user_rpc_keys = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(caller.id)) - .all(&db_conn) + .all(db_conn) .await .web3_context("failed loading user's key")?; @@ -397,15 +395,11 @@ pub async fn user_login_post( }; user_login - .save(&db_conn) + .save(db_conn) .await .web3_context("saving user login")?; - if let Err(err) = user_pending_login - .into_active_model() - .delete(&db_conn) - .await - { + if let Err(err) = user_pending_login.into_active_model().delete(db_conn).await { error!("Failed to delete nonce:{}: {}", login_nonce, err); } @@ -420,13 +414,11 @@ pub async fn user_logout_post( ) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app - .db_conn() - .web3_context("database needed for user logout")?; + let db_conn = app.db_conn()?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) - .exec(&db_conn) + .exec(db_conn) .await { warn!("Failed to delete {}: {}", user_bearer.redis_key(), err); @@ -437,7 +429,7 @@ pub async fn user_logout_post( // also delete any expired logins let delete_result = login::Entity::delete_many() .filter(login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await; trace!("Deleted expired logins: {:?}", delete_result); @@ -445,7 +437,7 @@ pub async fn user_logout_post( // also delete any expired pending logins let delete_result = login::Entity::delete_many() .filter(login::Column::ExpiresAt.lte(now)) - .exec(&db_conn) + .exec(db_conn) .await; trace!("Deleted expired pending logins: {:?}", delete_result); diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index d9ce453d..5e4e735d 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -45,7 +45,7 @@ pub async fn user_balance_get( ) -> Web3ProxyResponse { let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica().context("Getting database connection")?; + let db_replica = app.db_replica()?; // Just return the balance for the user let user_balance = balance::Entity::find() @@ -73,7 +73,7 @@ pub async fn user_deposits_get( ) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica().context("Getting database connection")?; + let db_replica = app.db_replica()?; // Filter by user ... let receipts = increase_on_chain_balance_receipt::Entity::find() @@ -114,7 +114,7 @@ pub async fn user_balance_post( let (_, semaphore) = app.bearer_is_authorized(bearer).await?; // TODO: is handling this as internal fine? - let authorization = Web3ProxyAuthorization::internal(app.db_conn())?; + let authorization = Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())?; (authorization, Some(semaphore)) } else if let Some(InsecureClientIp(ip)) = ip { @@ -136,7 +136,7 @@ pub async fn user_balance_post( Web3ProxyError::BadRequest(format!("unable to parse tx_hash: {}", err).into()) })?; - let db_conn = app.db_conn().context("query_user_stats needs a db")?; + let db_conn = app.db_conn()?; let authorization = Arc::new(authorization); @@ -172,7 +172,7 @@ pub async fn user_balance_post( true }; - let uncle_hashes = find_uncles.all(&db_conn).await?; + let uncle_hashes = find_uncles.all(db_conn).await?; let uncle_hashes: HashSet<_> = uncle_hashes .into_iter() diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index 3038228f..cab1398d 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -35,9 +35,7 @@ pub async fn user_referral_link_get( // First get the bearer token and check if the user is logged in let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; // Then get the referral token. If one doesn't exist, create one let user_referrer = referrer::Entity::find() @@ -49,7 +47,7 @@ pub async fn user_referral_link_get( Some(x) => (x.referral_code, StatusCode::OK), None => { // Connect to the database for writes - let db_conn = app.db_conn().context("getting db_conn")?; + let db_conn = app.db_conn()?; let referral_code = ReferralCode::default().to_string(); @@ -58,7 +56,7 @@ pub async fn user_referral_link_get( referral_code: sea_orm::ActiveValue::Set(referral_code.clone()), ..Default::default() }; - referrer_entry.save(&db_conn).await?; + referrer_entry.save(db_conn).await?; (referral_code, StatusCode::CREATED) } @@ -82,9 +80,7 @@ pub async fn user_used_referral_stats( // First get the bearer token and check if the user is logged in let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; // Get all referral records associated with this user let referrals = referee::Entity::find() @@ -142,9 +138,7 @@ pub async fn user_shared_referral_stats( // First get the bearer token and check if the user is logged in let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; // Get all referral records associated with this user let query_result = referrer::Entity::find() diff --git a/web3_proxy/src/frontend/users/rpc_keys.rs b/web3_proxy/src/frontend/users/rpc_keys.rs index 1dc45b2c..0ec84c70 100644 --- a/web3_proxy/src/frontend/users/rpc_keys.rs +++ b/web3_proxy/src/frontend/users/rpc_keys.rs @@ -31,9 +31,7 @@ pub async fn rpc_keys_get( ) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .web3_context("db_replica is required to fetch a user's keys")?; + let db_replica = app.db_replica()?; // This is basically completely copied from sea-orm. Not optimal, but it keeps the format identical to before (while adding the final key) // We could also pack the below stuff into it's subfield, but then we would destroy the format. Both options are fine for now though @@ -162,9 +160,7 @@ pub async fn rpc_keys_management( let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .web3_context("getting db for user's keys")?; + let db_replica = app.db_replica()?; let mut uk = match payload.key_id { Some(existing_key_id) => { @@ -342,9 +338,9 @@ pub async fn rpc_keys_management( } let uk = if uk.is_changed() { - let db_conn = app.db_conn().web3_context("login requires a db")?; + let db_conn = app.db_conn()?; - uk.save(&db_conn) + uk.save(db_conn) .await .web3_context("Failed saving user key")? } else { diff --git a/web3_proxy/src/frontend/users/stats.rs b/web3_proxy/src/frontend/users/stats.rs index e8a7531c..fbee05aa 100644 --- a/web3_proxy/src/frontend/users/stats.rs +++ b/web3_proxy/src/frontend/users/stats.rs @@ -46,9 +46,7 @@ pub async fn user_revert_logs_get( response.insert("chain_id", json!(chain_id)); response.insert("query_start", json!(query_start.timestamp() as u64)); - let db_replica = app - .db_replica() - .web3_context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) diff --git a/web3_proxy/src/frontend/users/subuser.rs b/web3_proxy/src/frontend/users/subuser.rs index 2707e5dc..83614fac 100644 --- a/web3_proxy/src/frontend/users/subuser.rs +++ b/web3_proxy/src/frontend/users/subuser.rs @@ -35,9 +35,7 @@ pub async fn get_keys_as_subuser( // First, authenticate let (subuser, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; // TODO: JOIN over RPC_KEY, SUBUSER, PRIMARY_USER and return these items @@ -102,9 +100,7 @@ pub async fn get_subusers( // First, authenticate let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; let rpc_key: u64 = params .remove("key_id") @@ -176,9 +172,7 @@ pub async fn modify_subuser( // First, authenticate let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app - .db_replica() - .context("getting replica db for user's revert logs")?; + let db_replica = app.db_replica()?; trace!("Parameters are: {:?}", params); @@ -262,7 +256,7 @@ pub async fn modify_subuser( } // TODO: There is a good chunk of duplicate logic as login-post. Consider refactoring ... - let db_conn = app.db_conn().web3_context("login requires a db")?; + let db_conn = app.db_conn()?; let (subuser, _subuser_rpc_keys, _status_code) = match subuser { None => { let txn = db_conn.begin().await?; @@ -344,12 +338,12 @@ pub async fn modify_subuser( let mut active_subuser_entry_secondary_user = secondary_user.into_active_model(); if !keep_subuser { // Remove the user - active_subuser_entry_secondary_user.delete(&db_conn).await?; + active_subuser_entry_secondary_user.delete(db_conn).await?; action = "removed"; } else { // Just change the role active_subuser_entry_secondary_user.role = sea_orm::Set(new_role.clone()); - active_subuser_entry_secondary_user.save(&db_conn).await?; + active_subuser_entry_secondary_user.save(db_conn).await?; action = "role modified"; } } diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 9f058ede..4939210b 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -43,8 +43,7 @@ pub async fn get_user_id_from_params( let user_login = login::Entity::find() .filter(login::Column::BearerToken.eq(user_bearer_token.uuid())) .one(db_replica.as_ref()) - .await - .context("database error while querying for user")? + .await? .ok_or(Web3ProxyError::AccessDenied)?; // if expired, delete ALL expired logins diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 7c16f9eb..68182f92 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -194,7 +194,7 @@ impl Web3Rpcs { return None; } - let db_conn = app.db_conn(); + let db_conn = app.db_conn().ok().cloned(); let http_client = app.http_client.clone(); let vredis_pool = app.vredis_pool.clone(); diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index a7cdffb8..ba6ced2b 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -5,7 +5,6 @@ use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_query_window_seconds_from_params, get_user_id_from_params, }; -use anyhow::Context; use axum::response::IntoResponse; use axum::Json; use axum::{ @@ -62,19 +61,13 @@ pub async fn query_user_stats<'a>( params: &'a HashMap, stat_response_type: StatType, ) -> Web3ProxyResponse { - let db_conn = app.db_conn().context("query_user_stats needs a db")?; - let db_replica = app - .db_replica() - .context("query_user_stats needs a db replica")?; - let mut redis_conn = app - .redis_conn() - .await - .context("query_user_stats had a redis connection error")? - .context("query_user_stats needs a redis")?; + let db_conn = app.db_conn()?; + let db_replica = app.db_replica()?; + let mut redis_conn = app.redis_conn().await?; // get the user id first. if it is 0, we should use a cache on the app let user_id = - get_user_id_from_params(&mut redis_conn, &db_conn, &db_replica, bearer, params).await?; + get_user_id_from_params(&mut redis_conn, db_conn, db_replica, bearer, params).await?; // get the query window seconds now so that we can pick a cache with a good TTL // TODO: for now though, just do one cache. its easier let query_window_seconds = get_query_window_seconds_from_params(params)?; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index ef471ee4..c47f4dc4 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -46,9 +46,7 @@ pub async fn query_user_stats<'a>( )); } - let db_replica = app - .db_replica() - .context("query_user_stats needs a db replica")?; + let db_replica = app.db_replica()?; // TODO: have a getter for this. do we need a connection pool on it? let influxdb_client = app