From a67b85a3279bbc4d28975a4f4062492ccdfc07d5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 25 Oct 2022 18:26:58 +0000 Subject: [PATCH] concurrency limits on bearer token actions --- web3_proxy/src/app.rs | 12 +++- web3_proxy/src/config.rs | 10 ++++ web3_proxy/src/frontend/authorization.rs | 8 +-- web3_proxy/src/frontend/users.rs | 70 +++++++++++++++++------- 4 files changed, 71 insertions(+), 29 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 37c86ae2..d67f8f6f 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -112,6 +112,8 @@ pub struct Web3ProxyApp { pub user_key_cache: Cache, pub user_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub bearer_token_semaphores: + Cache, hashbrown::hash_map::DefaultHashBuilder>, pub stat_sender: Option>, } @@ -396,6 +398,7 @@ impl Web3ProxyApp { ); // these two rate limiters can share the base limiter + // these are deferred rate limiters because we don't want redis network requests on the hot path // TODO: take cache_size from config frontend_ip_rate_limiter = Some(DeferredRateLimiter::::new( 10_000, @@ -407,7 +410,6 @@ impl Web3ProxyApp { 10_000, "key", rpc_rrl, None, )); - // don't defer this one because it will have a low request per peiod login_rate_limiter = Some(RedisRateLimiter::new( "web3_proxy", "login", @@ -454,12 +456,15 @@ impl Web3ProxyApp { // create semaphores for concurrent connection limits // TODO: what should tti be for semaphores? - let user_key_semaphores = Cache::builder() + let bearer_token_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); let ip_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + let user_key_semaphores = Cache::builder() + .time_to_idle(Duration::from_secs(120)) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); let app = Self { config: top_config.app, @@ -477,8 +482,9 @@ impl Web3ProxyApp { app_metrics, open_request_handle_metrics, user_key_cache, - user_key_semaphores, + bearer_token_semaphores, ip_semaphores, + user_key_semaphores, stat_sender, }; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 79328e26..a2f3b163 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -79,6 +79,11 @@ pub struct AppConfig { /// None = no code needed pub invite_code: Option, + /// Rate limit for bearer token authenticated entrypoints. + /// This is separate from the rpc limits. + #[serde(default = "default_bearer_token_max_concurrent_requests")] + pub bearer_token_max_concurrent_requests: u64, + /// Rate limit for the login entrypoint. /// This is separate from the rpc limits. #[serde(default = "default_login_rate_limit_per_minute")] @@ -148,6 +153,11 @@ fn default_public_requests_per_minute() -> Option { Some(0) } +/// Having a low amount of concurrent requests for bearer tokens keeps us from hammering the database. +fn default_bearer_token_max_concurrent_requests() -> u64 { + 2 +} + /// Having a low amount of requests per minute for login is safest. fn default_login_rate_limit_per_minute() -> u64 { 10 diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 1120c18e..66c40456 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -354,14 +354,12 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = user_data.max_concurrent_requests { let semaphore = self .user_key_semaphores - .try_get_with(user_data.user_key_id, async move { + .get_with(user_data.user_key_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); trace!("new semaphore for user_key_id {}", user_data.user_key_id); - Ok::<_, anyhow::Error>(Arc::new(s)) + Arc::new(s) }) - .await - // TODO: is this the best way to handle an arc - .map_err(|err| anyhow::anyhow!(err))?; + .await; // if semaphore.available_permits() == 0 { // // TODO: concurrent limit hit! emit a stat diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 1e325970..9dae1ce1 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -25,6 +25,7 @@ use std::ops::Add; use std::str::FromStr; use std::sync::Arc; use time::{Duration, OffsetDateTime}; +use tokio::sync::Semaphore; use tracing::{info, warn}; use ulid::Ulid; @@ -351,11 +352,11 @@ pub async fn user_logout_post( Ok("goodbye".into_response()) } -/// the JSON input to the `post_user` handler -/// This handles updating +/// the JSON input to the `post_user` handler. #[derive(Deserialize)] pub struct UserProfilePost { primary_address: Address, + new_primary_address: Option
, // TODO: make sure the email address is valid. probably have a "verified" column in the database email: Option, } @@ -363,28 +364,35 @@ pub struct UserProfilePost { /// `POST /user/profile` -- modify the account connected to the bearer token in the `Authentication` header. #[debug_handler] pub async fn user_profile_post( - TypedHeader(Authorization(bearer_token)): TypedHeader>, - ClientIp(ip): ClientIp, Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, Json(payload): Json, ) -> FrontendResult { - login_is_authorized(&app, ip).await?; - - let user = ProtectedAction::PostUser(payload.primary_address) - .verify(app.as_ref(), bearer_token) + let user = ProtectedAction::UserProfilePost(payload.primary_address) + .authorize(app.as_ref(), bearer_token) .await?; let mut user: user::ActiveModel = user.into(); - // TODO: rate limit by user, too? + // TODO: require a message from the new address to finish the change + if let Some(new_primary_address) = payload.new_primary_address { + if new_primary_address.is_zero() { + // TODO: allow this if some other authentication method is set + return Err(anyhow::anyhow!("cannot clear primary address").into()); + } else { + let new_primary_address = Vec::from(new_primary_address.as_ref()); - // TODO: allow changing the primary address, too. require a message from the new address to finish the change + user.address = sea_orm::Set(new_primary_address) + } + } if let Some(x) = payload.email { // TODO: only Set if no change if x.is_empty() { user.email = sea_orm::Set(None); } else { + // TODO: do some basic validation + // TODO: don't set immediatly, send a confirmation email first user.email = sea_orm::Set(Some(x)); } } @@ -393,7 +401,8 @@ pub async fn user_profile_post( user.save(&db_conn).await?; - todo!("finish post_user"); + // TODO: what should this return? the user? + Ok("success".into_response()) } /// `GET /user/balance` -- Use a bearer token to get the user's balance and spend. @@ -411,7 +420,7 @@ pub async fn user_balance_get( todo!("user_balance_get"); } -/// `POST /user/balance` -- Manually process a confirmed txid to update a user's balance. +/// `POST /user/balance/:txhash` -- Manually process a confirmed txid to update a user's balance. /// /// We will subscribe to events to watch for any user deposits, but sometimes events can be missed. /// @@ -434,6 +443,10 @@ pub async fn user_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { + let user = ProtectedAction::UserKeysGet + .authorize(app.as_ref(), bearer_token) + .await?; + todo!("user_keys_get"); } @@ -507,16 +520,29 @@ pub async fn user_stats_aggregate_get( /// Handle authorization for a given address and bearer token. // TODO: what roles should exist? enum ProtectedAction { - PostUser(Address), + UserKeysGet, + UserProfilePost(Address), } impl ProtectedAction { /// Verify that the given bearer token and address are allowed to take the specified action. - async fn verify(self, app: &Web3ProxyApp, bearer: Bearer) -> anyhow::Result { + /// This includes concurrent request limiting. + async fn authorize(self, app: &Web3ProxyApp, bearer: Bearer) -> anyhow::Result { // get the attached address from redis for the given auth_token. let mut redis_conn = app.redis_conn().await?; - // TODO: move this to a helper function + // limit concurrent requests + let semaphore = app + .bearer_token_semaphores + .get_with(bearer.token().to_string(), async move { + let s = Semaphore::new(app.config.bearer_token_max_concurrent_requests as usize); + Arc::new(s) + }) + .await; + let _semaphore_permit = semaphore.acquire().await?; + + // get the user id for this bearer token + // TODO: move redis key building to a helper function let bearer_cache_key = format!("bearer:{}", bearer.token()); // TODO: move this to a helper function @@ -526,18 +552,20 @@ impl ProtectedAction { .context("fetching bearer cache key from redis")? .context("unknown bearer token")?; + // turn user id into a user let db_conn = app.db_conn().context("Getting database connection")?; - - // turn user key id into a user key - let user_data = user::Entity::find_by_id(user_id) + let user = user::Entity::find_by_id(user_id) .one(&db_conn) .await .context("fetching user from db by id")? .context("unknown user id")?; match self { - Self::PostUser(primary_address) => { - let user_address = Address::from_slice(&user_data.address); + Self::UserKeysGet => { + // no extra checks needed. bearer token gave us a user + } + Self::UserProfilePost(primary_address) => { + let user_address = Address::from_slice(&user.address); if user_address != primary_address { // TODO: check secondary users @@ -546,6 +574,6 @@ impl ProtectedAction { } } - Ok(user_data) + Ok(user) } }