remove bearer token semaphore. it isn't really necesary

we can come back to rate limiting authenticated endpoints later
This commit is contained in:
Bryan Stitt 2023-07-03 16:54:25 -07:00
parent c1fc7d7a6a
commit a64e32b085
15 changed files with 38 additions and 74 deletions

@ -23,7 +23,6 @@ use crate::rpcs::one::Web3Rpc;
use crate::rpcs::provider::{connect_http, EthersHttpProvider};
use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, StatBuffer};
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::http::StatusCode;
use chrono::Utc;
@ -83,8 +82,6 @@ pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Balance>>>;
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Rpcs>,
/// concurrent/parallel application request limits for authenticated users
pub bearer_token_semaphores: Cache<UserBearerToken, Arc<Semaphore>>,
/// Send 4337 Abstraction Bundler requests to one of these servers
pub bundler_4337_rpcs: Option<Arc<Web3Rpcs>>,
/// application config
@ -501,11 +498,7 @@ impl Web3ProxyApp {
let max_users = 20_000;
// create semaphores for concurrent connection limits
// TODO: how can we implement time til idle?
// TODO: what should tti be for semaphores?
let bearer_token_semaphores = CacheBuilder::new(max_users)
.name("bearer_token_semaphores")
.build();
// TODO: time-to-idle on these. need to make sure the arcs aren't anywhere though. so maybe arc isn't correct and it should be refs
let ip_semaphores = CacheBuilder::new(max_users).name("ip_semaphores").build();
let user_semaphores = CacheBuilder::new(max_users).name("user_semaphores").build();
@ -594,7 +587,6 @@ impl Web3ProxyApp {
let app = Self {
balanced_rpcs,
bearer_token_semaphores,
bundler_4337_rpcs,
config: top_config.app.clone(),
db_conn,

@ -122,11 +122,6 @@ pub struct AppConfig {
/// do not serve any requests if the best known block is behind the best known block by more than this many blocks.
pub max_head_block_lag: Option<U64>,
/// Rate limit for bearer token authenticated entrypoints.
/// This is separate from the rpc limits.
#[serde(default = "default_bearer_token_max_concurrent_requests")]
pub bearer_token_max_concurrent_requests: u64,
/// Rate limit for the login entrypoint.
/// This is separate from the rpc limits.
#[serde(default = "default_login_rate_limit_per_period")]
@ -212,11 +207,6 @@ fn default_min_synced_rpcs() -> usize {
1
}
/// Having a low amount of concurrent requests for bearer tokens keeps us from hammering the database.
fn default_bearer_token_max_concurrent_requests() -> u64 {
2
}
/// Having a low amount of requests per period (usually minute) for login is safest.
fn default_login_rate_limit_per_period() -> u64 {
10

@ -55,16 +55,14 @@ pub async fn admin_increase_balance(
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<AdminIncreaseBalancePost>,
) -> Web3ProxyResponse {
let (caller, _semaphore) = app.bearer_is_authorized(bearer).await?;
let caller_id = caller.id;
let caller = app.bearer_is_authorized(bearer).await?;
// Establish connections
let txn = app.db_transaction().await?;
// Check if the caller is an admin (if not, return early)
let admin_entry: admin::Model = admin::Entity::find()
.filter(admin::Column::UserId.eq(caller_id))
.filter(admin::Column::UserId.eq(caller.id))
.one(&txn)
.await?
.ok_or_else(|| Web3ProxyError::AccessDenied("not an admin".into()))?;

@ -1010,24 +1010,10 @@ impl Web3ProxyApp {
/// Verify that the given bearer token and address are allowed to take the specified action.
/// This includes concurrent request limiting.
/// keep the semaphore alive until the user's request is entirely complete
pub async fn bearer_is_authorized(
&self,
bearer: Bearer,
) -> Web3ProxyResult<(user::Model, OwnedSemaphorePermit)> {
pub async fn bearer_is_authorized(&self, bearer: Bearer) -> Web3ProxyResult<user::Model> {
// get the user id for this bearer token
let user_bearer_token = UserBearerToken::try_from(bearer)?;
// limit concurrent requests
let semaphore = self
.bearer_token_semaphores
.get_with_by_ref(&user_bearer_token, async move {
let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
})
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
// get the attached address from the database for the given auth_token.
let db_replica = self.db_replica()?;
@ -1041,7 +1027,7 @@ impl Web3ProxyApp {
.web3_context("fetching user from db by bearer token")?
.web3_context("unknown bearer token")?;
Ok((user, semaphore_permit))
Ok(user)
}
pub async fn rate_limit_login(

@ -196,7 +196,6 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
"balanced_rpcs": app.balanced_rpcs,
"bundler_4337_rpcs": app.bundler_4337_rpcs,
"caches": [
MokaCacheSerializer(&app.bearer_token_semaphores),
MokaCacheSerializer(&app.ip_semaphores),
MokaCacheSerializer(&app.jsonrpc_response_cache),
MokaCacheSerializer(&app.rpc_secret_key_cache),

@ -31,7 +31,7 @@ pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
let user = app.bearer_is_authorized(bearer_token).await?;
Ok(Json(user).into_response())
}
@ -51,7 +51,7 @@ pub async fn user_post(
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserPost>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
let user = app.bearer_is_authorized(bearer_token).await?;
let user_id = user.id;

@ -43,19 +43,23 @@ pub async fn user_balance_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
// Just return the balance for the user
let user_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(_user.id))
let user_balance_row = balance::Entity::find()
.filter(balance::Column::UserId.eq(user.id))
.one(db_replica.as_ref())
.await?
.map(|x| x.total_deposits - x.total_spent_outside_free_tier)
.unwrap_or_default();
let user_balance =
user_balance_row.total_deposits - user_balance_row.total_spent_outside_free_tier;
let response = json!({
"total_deposits": user_balance_row.total_deposits,
"total_spent_outside_free_tier": user_balance_row.total_spent_outside_free_tier,
"total_spent": user_balance_row.total_spent_including_free_tier,
"balance": user_balance,
});
@ -71,7 +75,7 @@ pub async fn user_deposits_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
@ -110,17 +114,13 @@ pub async fn user_balance_post(
bearer: Option<TypedHeader<Authorization<Bearer>>>,
) -> Web3ProxyResponse {
// rate limit by bearer token **OR** IP address
let (authorization, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let (_, semaphore) = app.bearer_is_authorized(bearer).await?;
let authorization = if let Some(TypedHeader(Authorization(bearer))) = bearer {
app.bearer_is_authorized(bearer).await?;
// TODO: is handling this as internal fine?
let authorization = Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())?;
(authorization, Some(semaphore))
Web3ProxyAuthorization::internal(app.db_conn().ok().cloned())?
} else if let Some(InsecureClientIp(ip)) = ip {
let authorization = login_is_authorized(&app, ip).await?;
(authorization, None)
login_is_authorized(&app, ip).await?
} else {
return Err(Web3ProxyError::AccessDenied("no bearer token or ip".into()));
};

@ -31,7 +31,7 @@ pub async fn user_stripe_deposits_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica().context("Getting database connection")?;

@ -33,7 +33,7 @@ pub async fn user_referral_link_get(
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
// First get the bearer token and check if the user is logged in
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
@ -78,7 +78,7 @@ pub async fn user_used_referral_stats(
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
// First get the bearer token and check if the user is logged in
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
@ -136,7 +136,7 @@ pub async fn user_shared_referral_stats(
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
// First get the bearer token and check if the user is logged in
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;

@ -29,7 +29,7 @@ pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
@ -125,7 +125,7 @@ pub async fn rpc_keys_delete(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let _user = app.bearer_is_authorized(bearer).await?;
// TODO: think about how cascading deletes and billing should work
Err(Web3ProxyError::NotImplemented("rpc_keys_delete".into()))
@ -158,7 +158,7 @@ pub async fn rpc_keys_management(
) -> Web3ProxyResponse {
// TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;

@ -30,7 +30,7 @@ pub async fn user_revert_logs_get(
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let chain_id = get_chain_id_from_params(app.as_ref(), &params)?;
let query_start = get_query_start_from_params(&params)?;

@ -33,7 +33,7 @@ pub async fn get_keys_as_subuser(
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
// First, authenticate
let (subuser, _semaphore) = app.bearer_is_authorized(bearer).await?;
let subuser = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
@ -98,7 +98,7 @@ pub async fn get_subusers(
Query(mut params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
// First, authenticate
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;
@ -170,7 +170,7 @@ pub async fn modify_subuser(
Query(mut params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
// First, authenticate
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
let db_replica = app.db_replica()?;

@ -32,13 +32,13 @@ pub async fn query_user_stats<'a>(
params: &'a HashMap<String, String>,
stat_response_type: StatType,
) -> Web3ProxyResponse {
let (caller_user, _semaphore) = match bearer {
let caller_user = match bearer {
Some(TypedHeader(Authorization(bearer))) => {
let (user, semaphore) = app.bearer_is_authorized(bearer).await?;
let user = app.bearer_is_authorized(bearer).await?;
(Some(user), Some(semaphore))
Some(user)
}
None => (None, None),
None => None,
};
// Return an error if the bearer is **not** set, but the StatType is Detailed

@ -200,7 +200,7 @@ impl TestApp {
// TODO: make sure mysql is actually ready for connections
sleep(Duration::from_secs(1)).await;
info!(%db_url, elapsed=%start.elapsed().as_secs_f32(), "db is ready for connections. Migrating now...");
info!(%db_url, elapsed=%start.elapsed().as_secs_f32(), "db post is open. Migrating now...");
// try to migrate
let start = Instant::now();
@ -218,7 +218,7 @@ impl TestApp {
}
Err(err) => {
// not connected. sleep and then try again
warn!(?err, "unable to migrate db");
warn!(?err, "unable to migrate db. retrying in 1 second");
sleep(Duration::from_secs(1)).await;
}
}

@ -168,7 +168,6 @@ async fn test_admin_grant_credits() {
.send()
.await
.unwrap();
info!("bug is on the line above. it never returns");
info!(?increase_balance_response, "http response");
let increase_balance_response = increase_balance_response