diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 46dc2b9b..c528c443 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -23,7 +23,6 @@ use crate::rpcs::one::Web3Rpc; use crate::rpcs::provider::{connect_http, EthersHttpProvider}; use crate::rpcs::transactions::TxStatus; use crate::stats::{AppStat, StatBuffer}; -use crate::user_token::UserBearerToken; use anyhow::Context; use axum::http::StatusCode; use chrono::Utc; @@ -83,8 +82,6 @@ pub type UserBalanceCache = Cache>>; pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, - /// concurrent/parallel application request limits for authenticated users - pub bearer_token_semaphores: Cache>, /// Send 4337 Abstraction Bundler requests to one of these servers pub bundler_4337_rpcs: Option>, /// application config @@ -501,11 +498,7 @@ impl Web3ProxyApp { let max_users = 20_000; // create semaphores for concurrent connection limits - // TODO: how can we implement time til idle? - // TODO: what should tti be for semaphores? - let bearer_token_semaphores = CacheBuilder::new(max_users) - .name("bearer_token_semaphores") - .build(); + // TODO: time-to-idle on these. need to make sure the arcs aren't anywhere though. so maybe arc isn't correct and it should be refs let ip_semaphores = CacheBuilder::new(max_users).name("ip_semaphores").build(); let user_semaphores = CacheBuilder::new(max_users).name("user_semaphores").build(); @@ -594,7 +587,6 @@ impl Web3ProxyApp { let app = Self { balanced_rpcs, - bearer_token_semaphores, bundler_4337_rpcs, config: top_config.app.clone(), db_conn, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 15ce574c..ef3342b0 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -122,11 +122,6 @@ pub struct AppConfig { /// do not serve any requests if the best known block is behind the best known block by more than this many blocks. pub max_head_block_lag: Option, - /// Rate limit for bearer token authenticated entrypoints. - /// This is separate from the rpc limits. - #[serde(default = "default_bearer_token_max_concurrent_requests")] - pub bearer_token_max_concurrent_requests: u64, - /// Rate limit for the login entrypoint. /// This is separate from the rpc limits. #[serde(default = "default_login_rate_limit_per_period")] @@ -212,11 +207,6 @@ fn default_min_synced_rpcs() -> usize { 1 } -/// Having a low amount of concurrent requests for bearer tokens keeps us from hammering the database. -fn default_bearer_token_max_concurrent_requests() -> u64 { - 2 -} - /// Having a low amount of requests per period (usually minute) for login is safest. fn default_login_rate_limit_per_period() -> u64 { 10 diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 92c8adad..9eca41a4 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -55,16 +55,14 @@ pub async fn admin_increase_balance( TypedHeader(Authorization(bearer)): TypedHeader>, Json(payload): Json, ) -> Web3ProxyResponse { - let (caller, _semaphore) = app.bearer_is_authorized(bearer).await?; - - let caller_id = caller.id; + let caller = app.bearer_is_authorized(bearer).await?; // Establish connections let txn = app.db_transaction().await?; // Check if the caller is an admin (if not, return early) let admin_entry: admin::Model = admin::Entity::find() - .filter(admin::Column::UserId.eq(caller_id)) + .filter(admin::Column::UserId.eq(caller.id)) .one(&txn) .await? .ok_or_else(|| Web3ProxyError::AccessDenied("not an admin".into()))?; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 6be1a1fa..c89874d2 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1010,24 +1010,10 @@ impl Web3ProxyApp { /// Verify that the given bearer token and address are allowed to take the specified action. /// This includes concurrent request limiting. /// keep the semaphore alive until the user's request is entirely complete - pub async fn bearer_is_authorized( - &self, - bearer: Bearer, - ) -> Web3ProxyResult<(user::Model, OwnedSemaphorePermit)> { + pub async fn bearer_is_authorized(&self, bearer: Bearer) -> Web3ProxyResult { // get the user id for this bearer token let user_bearer_token = UserBearerToken::try_from(bearer)?; - // limit concurrent requests - let semaphore = self - .bearer_token_semaphores - .get_with_by_ref(&user_bearer_token, async move { - let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize); - Arc::new(s) - }) - .await; - - let semaphore_permit = semaphore.acquire_owned().await?; - // get the attached address from the database for the given auth_token. let db_replica = self.db_replica()?; @@ -1041,7 +1027,7 @@ impl Web3ProxyApp { .web3_context("fetching user from db by bearer token")? .web3_context("unknown bearer token")?; - Ok((user, semaphore_permit)) + Ok(user) } pub async fn rate_limit_login( diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 4a77c144..06aa4c53 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -196,7 +196,6 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "balanced_rpcs": app.balanced_rpcs, "bundler_4337_rpcs": app.bundler_4337_rpcs, "caches": [ - MokaCacheSerializer(&app.bearer_token_semaphores), MokaCacheSerializer(&app.ip_semaphores), MokaCacheSerializer(&app.jsonrpc_response_cache), MokaCacheSerializer(&app.rpc_secret_key_cache), diff --git a/web3_proxy/src/frontend/users/mod.rs b/web3_proxy/src/frontend/users/mod.rs index 2cd61457..1caaf3a3 100644 --- a/web3_proxy/src/frontend/users/mod.rs +++ b/web3_proxy/src/frontend/users/mod.rs @@ -31,7 +31,7 @@ pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> Web3ProxyResponse { - let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; + let user = app.bearer_is_authorized(bearer_token).await?; Ok(Json(user).into_response()) } @@ -51,7 +51,7 @@ pub async fn user_post( TypedHeader(Authorization(bearer_token)): TypedHeader>, Json(payload): Json, ) -> Web3ProxyResponse { - let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; + let user = app.bearer_is_authorized(bearer_token).await?; let user_id = user.id; diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 28a3a4da..c060b152 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -43,19 +43,23 @@ pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> Web3ProxyResponse { - let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; - // Just return the balance for the user - let user_balance = balance::Entity::find() - .filter(balance::Column::UserId.eq(_user.id)) + let user_balance_row = balance::Entity::find() + .filter(balance::Column::UserId.eq(user.id)) .one(db_replica.as_ref()) .await? - .map(|x| x.total_deposits - x.total_spent_outside_free_tier) .unwrap_or_default(); + let user_balance = + user_balance_row.total_deposits - user_balance_row.total_spent_outside_free_tier; + let response = json!({ + "total_deposits": user_balance_row.total_deposits, + "total_spent_outside_free_tier": user_balance_row.total_spent_outside_free_tier, + "total_spent": user_balance_row.total_spent_including_free_tier, "balance": user_balance, }); @@ -71,7 +75,7 @@ pub async fn user_deposits_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> Web3ProxyResponse { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; @@ -110,17 +114,13 @@ pub async fn user_balance_post( bearer: Option>>, ) -> Web3ProxyResponse { // rate limit by bearer token **OR** IP address - let (authorization, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer { - let (_, semaphore) = app.bearer_is_authorized(bearer).await?; + let authorization = if let Some(TypedHeader(Authorization(bearer))) = bearer { + app.bearer_is_authorized(bearer).await?; // TODO: is handling this as internal fine? - let authorization = Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())?; - - (authorization, Some(semaphore)) + Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())? } else if let Some(InsecureClientIp(ip)) = ip { - let authorization = login_is_authorized(&app, ip).await?; - - (authorization, None) + login_is_authorized(&app, ip).await? } else { return Err(Web3ProxyError::AccessDenied("no bearer token or ip".into())); }; diff --git a/web3_proxy/src/frontend/users/payment_stripe.rs b/web3_proxy/src/frontend/users/payment_stripe.rs index ec1d57fe..ad699efb 100644 --- a/web3_proxy/src/frontend/users/payment_stripe.rs +++ b/web3_proxy/src/frontend/users/payment_stripe.rs @@ -31,7 +31,7 @@ pub async fn user_stripe_deposits_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> Web3ProxyResponse { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica().context("Getting database connection")?; diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index cab1398d..0e59fbec 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -33,7 +33,7 @@ pub async fn user_referral_link_get( Query(_params): Query>, ) -> Web3ProxyResponse { // First get the bearer token and check if the user is logged in - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; @@ -78,7 +78,7 @@ pub async fn user_used_referral_stats( Query(_params): Query>, ) -> Web3ProxyResponse { // First get the bearer token and check if the user is logged in - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; @@ -136,7 +136,7 @@ pub async fn user_shared_referral_stats( Query(_params): Query>, ) -> Web3ProxyResponse { // First get the bearer token and check if the user is logged in - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; diff --git a/web3_proxy/src/frontend/users/rpc_keys.rs b/web3_proxy/src/frontend/users/rpc_keys.rs index bd642182..f725f616 100644 --- a/web3_proxy/src/frontend/users/rpc_keys.rs +++ b/web3_proxy/src/frontend/users/rpc_keys.rs @@ -29,7 +29,7 @@ pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> Web3ProxyResponse { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; @@ -125,7 +125,7 @@ pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> Web3ProxyResponse { - let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let _user = app.bearer_is_authorized(bearer).await?; // TODO: think about how cascading deletes and billing should work Err(Web3ProxyError::NotImplemented("rpc_keys_delete".into())) @@ -158,7 +158,7 @@ pub async fn rpc_keys_management( ) -> Web3ProxyResponse { // TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; diff --git a/web3_proxy/src/frontend/users/stats.rs b/web3_proxy/src/frontend/users/stats.rs index fbee05aa..917311f3 100644 --- a/web3_proxy/src/frontend/users/stats.rs +++ b/web3_proxy/src/frontend/users/stats.rs @@ -30,7 +30,7 @@ pub async fn user_revert_logs_get( TypedHeader(Authorization(bearer)): TypedHeader>, Query(params): Query>, ) -> Web3ProxyResponse { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let chain_id = get_chain_id_from_params(app.as_ref(), ¶ms)?; let query_start = get_query_start_from_params(¶ms)?; diff --git a/web3_proxy/src/frontend/users/subuser.rs b/web3_proxy/src/frontend/users/subuser.rs index 83614fac..ced8f6b5 100644 --- a/web3_proxy/src/frontend/users/subuser.rs +++ b/web3_proxy/src/frontend/users/subuser.rs @@ -33,7 +33,7 @@ pub async fn get_keys_as_subuser( Query(_params): Query>, ) -> Web3ProxyResponse { // First, authenticate - let (subuser, _semaphore) = app.bearer_is_authorized(bearer).await?; + let subuser = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; @@ -98,7 +98,7 @@ pub async fn get_subusers( Query(mut params): Query>, ) -> Web3ProxyResponse { // First, authenticate - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; @@ -170,7 +170,7 @@ pub async fn modify_subuser( Query(mut params): Query>, ) -> Web3ProxyResponse { // First, authenticate - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; let db_replica = app.db_replica()?; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 4c0d80ad..5cec8d73 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -32,13 +32,13 @@ pub async fn query_user_stats<'a>( params: &'a HashMap, stat_response_type: StatType, ) -> Web3ProxyResponse { - let (caller_user, _semaphore) = match bearer { + let caller_user = match bearer { Some(TypedHeader(Authorization(bearer))) => { - let (user, semaphore) = app.bearer_is_authorized(bearer).await?; + let user = app.bearer_is_authorized(bearer).await?; - (Some(user), Some(semaphore)) + Some(user) } - None => (None, None), + None => None, }; // Return an error if the bearer is **not** set, but the StatType is Detailed diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index 9a77b2e1..cf5a8ecd 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -200,7 +200,7 @@ impl TestApp { // TODO: make sure mysql is actually ready for connections sleep(Duration::from_secs(1)).await; - info!(%db_url, elapsed=%start.elapsed().as_secs_f32(), "db is ready for connections. Migrating now..."); + info!(%db_url, elapsed=%start.elapsed().as_secs_f32(), "db post is open. Migrating now..."); // try to migrate let start = Instant::now(); @@ -218,7 +218,7 @@ impl TestApp { } Err(err) => { // not connected. sleep and then try again - warn!(?err, "unable to migrate db"); + warn!(?err, "unable to migrate db. retrying in 1 second"); sleep(Duration::from_secs(1)).await; } } diff --git a/web3_proxy/tests/test_admins.rs b/web3_proxy/tests/test_admins.rs index 977c2b31..08736805 100644 --- a/web3_proxy/tests/test_admins.rs +++ b/web3_proxy/tests/test_admins.rs @@ -168,7 +168,6 @@ async fn test_admin_grant_credits() { .send() .await .unwrap(); - info!("bug is on the line above. it never returns"); info!(?increase_balance_response, "http response"); let increase_balance_response = increase_balance_response