concurrency limits on bearer token actions

This commit is contained in:
Bryan Stitt 2022-10-25 18:26:58 +00:00
parent 21956afe73
commit a67b85a327
4 changed files with 71 additions and 29 deletions

View File

@ -112,6 +112,8 @@ pub struct Web3ProxyApp {
pub user_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores:
Cache<String, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub stat_sender: Option<flume::Sender<Web3ProxyStat>>,
}
@ -396,6 +398,7 @@ impl Web3ProxyApp {
);
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
@ -407,7 +410,6 @@ impl Web3ProxyApp {
10_000, "key", rpc_rrl, None,
));
// don't defer this one because it will have a low request per peiod
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
@ -454,12 +456,15 @@ impl Web3ProxyApp {
// create semaphores for concurrent connection limits
// TODO: what should tti be for semaphores?
let user_key_semaphores = Cache::builder()
let bearer_token_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let ip_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let user_key_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let app = Self {
config: top_config.app,
@ -477,8 +482,9 @@ impl Web3ProxyApp {
app_metrics,
open_request_handle_metrics,
user_key_cache,
user_key_semaphores,
bearer_token_semaphores,
ip_semaphores,
user_key_semaphores,
stat_sender,
};

View File

@ -79,6 +79,11 @@ pub struct AppConfig {
/// None = no code needed
pub invite_code: Option<String>,
/// Rate limit for bearer token authenticated entrypoints.
/// This is separate from the rpc limits.
#[serde(default = "default_bearer_token_max_concurrent_requests")]
pub bearer_token_max_concurrent_requests: u64,
/// Rate limit for the login entrypoint.
/// This is separate from the rpc limits.
#[serde(default = "default_login_rate_limit_per_minute")]
@ -148,6 +153,11 @@ fn default_public_requests_per_minute() -> Option<u64> {
Some(0)
}
/// Having a low amount of concurrent requests for bearer tokens keeps us from hammering the database.
fn default_bearer_token_max_concurrent_requests() -> u64 {
2
}
/// Having a low amount of requests per minute for login is safest.
fn default_login_rate_limit_per_minute() -> u64 {
10

View File

@ -354,14 +354,12 @@ impl Web3ProxyApp {
if let Some(max_concurrent_requests) = user_data.max_concurrent_requests {
let semaphore = self
.user_key_semaphores
.try_get_with(user_data.user_key_id, async move {
.get_with(user_data.user_key_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
trace!("new semaphore for user_key_id {}", user_data.user_key_id);
Ok::<_, anyhow::Error>(Arc::new(s))
Arc::new(s)
})
.await
// TODO: is this the best way to handle an arc
.map_err(|err| anyhow::anyhow!(err))?;
.await;
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat

View File

@ -25,6 +25,7 @@ use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use tokio::sync::Semaphore;
use tracing::{info, warn};
use ulid::Ulid;
@ -351,11 +352,11 @@ pub async fn user_logout_post(
Ok("goodbye".into_response())
}
/// the JSON input to the `post_user` handler
/// This handles updating
/// the JSON input to the `post_user` handler.
#[derive(Deserialize)]
pub struct UserProfilePost {
primary_address: Address,
new_primary_address: Option<Address>,
// TODO: make sure the email address is valid. probably have a "verified" column in the database
email: Option<String>,
}
@ -363,28 +364,35 @@ pub struct UserProfilePost {
/// `POST /user/profile` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_profile_post(
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
ClientIp(ip): ClientIp,
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserProfilePost>,
) -> FrontendResult {
login_is_authorized(&app, ip).await?;
let user = ProtectedAction::PostUser(payload.primary_address)
.verify(app.as_ref(), bearer_token)
let user = ProtectedAction::UserProfilePost(payload.primary_address)
.authorize(app.as_ref(), bearer_token)
.await?;
let mut user: user::ActiveModel = user.into();
// TODO: rate limit by user, too?
// TODO: require a message from the new address to finish the change
if let Some(new_primary_address) = payload.new_primary_address {
if new_primary_address.is_zero() {
// TODO: allow this if some other authentication method is set
return Err(anyhow::anyhow!("cannot clear primary address").into());
} else {
let new_primary_address = Vec::from(new_primary_address.as_ref());
// TODO: allow changing the primary address, too. require a message from the new address to finish the change
user.address = sea_orm::Set(new_primary_address)
}
}
if let Some(x) = payload.email {
// TODO: only Set if no change
if x.is_empty() {
user.email = sea_orm::Set(None);
} else {
// TODO: do some basic validation
// TODO: don't set immediatly, send a confirmation email first
user.email = sea_orm::Set(Some(x));
}
}
@ -393,7 +401,8 @@ pub async fn user_profile_post(
user.save(&db_conn).await?;
todo!("finish post_user");
// TODO: what should this return? the user?
Ok("success".into_response())
}
/// `GET /user/balance` -- Use a bearer token to get the user's balance and spend.
@ -411,7 +420,7 @@ pub async fn user_balance_get(
todo!("user_balance_get");
}
/// `POST /user/balance` -- Manually process a confirmed txid to update a user's balance.
/// `POST /user/balance/:txhash` -- Manually process a confirmed txid to update a user's balance.
///
/// We will subscribe to events to watch for any user deposits, but sometimes events can be missed.
///
@ -434,6 +443,10 @@ pub async fn user_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let user = ProtectedAction::UserKeysGet
.authorize(app.as_ref(), bearer_token)
.await?;
todo!("user_keys_get");
}
@ -507,16 +520,29 @@ pub async fn user_stats_aggregate_get(
/// Handle authorization for a given address and bearer token.
// TODO: what roles should exist?
enum ProtectedAction {
PostUser(Address),
UserKeysGet,
UserProfilePost(Address),
}
impl ProtectedAction {
/// Verify that the given bearer token and address are allowed to take the specified action.
async fn verify(self, app: &Web3ProxyApp, bearer: Bearer) -> anyhow::Result<user::Model> {
/// This includes concurrent request limiting.
async fn authorize(self, app: &Web3ProxyApp, bearer: Bearer) -> anyhow::Result<user::Model> {
// get the attached address from redis for the given auth_token.
let mut redis_conn = app.redis_conn().await?;
// TODO: move this to a helper function
// limit concurrent requests
let semaphore = app
.bearer_token_semaphores
.get_with(bearer.token().to_string(), async move {
let s = Semaphore::new(app.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
})
.await;
let _semaphore_permit = semaphore.acquire().await?;
// get the user id for this bearer token
// TODO: move redis key building to a helper function
let bearer_cache_key = format!("bearer:{}", bearer.token());
// TODO: move this to a helper function
@ -526,18 +552,20 @@ impl ProtectedAction {
.context("fetching bearer cache key from redis")?
.context("unknown bearer token")?;
// turn user id into a user
let db_conn = app.db_conn().context("Getting database connection")?;
// turn user key id into a user key
let user_data = user::Entity::find_by_id(user_id)
let user = user::Entity::find_by_id(user_id)
.one(&db_conn)
.await
.context("fetching user from db by id")?
.context("unknown user id")?;
match self {
Self::PostUser(primary_address) => {
let user_address = Address::from_slice(&user_data.address);
Self::UserKeysGet => {
// no extra checks needed. bearer token gave us a user
}
Self::UserProfilePost(primary_address) => {
let user_address = Address::from_slice(&user.address);
if user_address != primary_address {
// TODO: check secondary users
@ -546,6 +574,6 @@ impl ProtectedAction {
}
}
Ok(user_data)
Ok(user)
}
}