semafore cleanup

This commit is contained in:
Bryan Stitt 2022-10-25 04:01:41 +00:00
parent 4ccbcb8af4
commit 618bfeb861
4 changed files with 44 additions and 34 deletions

View File

@ -192,7 +192,7 @@ These are roughly in order of completition
- need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- [x] don't use unix timestamps for response_millis since leap seconds will confuse it - [x] don't use unix timestamps for response_millis since leap seconds will confuse it
- [x] config to allow origins even on the anonymous endpoints - [x] config to allow origins even on the anonymous endpoints
- [ ] log errors to sentry - [x] send logs to sentry
- [-] ability to domain lock or ip lock said key - [-] ability to domain lock or ip lock said key
- the code to check the database and use these entries already exists, but users don't have a way to set them - the code to check the database and use these entries already exists, but users don't have a way to set them
- [-] new endpoints for users (not totally sure about the exact paths, but these features are all needed): - [-] new endpoints for users (not totally sure about the exact paths, but these features are all needed):
@ -353,7 +353,6 @@ in another repo: event subscriber
- [ ] test /api/getGaugesmethod - [ ] test /api/getGaugesmethod
- usually times out after vercel's 60 second timeout - usually times out after vercel's 60 second timeout
- one time got: Error invalid Json response "" - one time got: Error invalid Json response ""
- [ ] send logs to sentry
- [ ] i think all the async methods in ethers need tracing instrument. something like `cfgif(tracing, tracing::instrument)` - [ ] i think all the async methods in ethers need tracing instrument. something like `cfgif(tracing, tracing::instrument)`
- if they do that, i think my request_id will show up on their logs - if they do that, i think my request_id will show up on their logs
- [ ] page that prints a graphviz dotfile of the blockchain - [ ] page that prints a graphviz dotfile of the blockchain

View File

@ -92,6 +92,12 @@ pub struct AppConfig {
#[serde(default = "default_public_requests_per_minute")] #[serde(default = "default_public_requests_per_minute")]
pub public_requests_per_minute: Option<u64>, pub public_requests_per_minute: Option<u64>,
/// Concurrent request limit for anonymous users.
/// Some(0) = block all requests
/// None = allow all requests
#[serde(default = "default_public_max_concurrent_requests")]
pub public_max_concurrent_requests: Option<usize>,
/// Rate limit for the login entrypoint. /// Rate limit for the login entrypoint.
/// This is separate from the rpc limits. /// This is separate from the rpc limits.
#[serde(default = "default_login_rate_limit_per_minute")] #[serde(default = "default_login_rate_limit_per_minute")]
@ -129,6 +135,13 @@ fn default_min_synced_rpcs() -> usize {
1 1
} }
/// 0 blocks anonymous requests.
/// None allows unlimited concurrent requests
// TODO: what is a reasonable default?
fn default_public_max_concurrent_requests() -> Option<usize> {
Some(5)
}
/// 0 blocks anonymous requests by default. /// 0 blocks anonymous requests by default.
fn default_public_requests_per_minute() -> Option<u64> { fn default_public_requests_per_minute() -> Option<u64> {
Some(0) Some(0)

View File

@ -35,7 +35,7 @@ pub enum UserKey {
pub enum RateLimitResult { pub enum RateLimitResult {
/// contains the IP of the anonymous user /// contains the IP of the anonymous user
/// TODO: option inside or outside the arc? /// TODO: option inside or outside the arc?
AllowedIp(IpAddr, OwnedSemaphorePermit), AllowedIp(IpAddr, Option<OwnedSemaphorePermit>),
/// contains the user_key_id of an authenticated user /// contains the user_key_id of an authenticated user
AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>), AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>),
/// contains the IP and retry_at of the anonymous user /// contains the IP and retry_at of the anonymous user
@ -262,10 +262,10 @@ impl Display for &AuthorizedRequest {
pub async fn login_is_authorized( pub async fn login_is_authorized(
app: &Web3ProxyApp, app: &Web3ProxyApp,
ip: IpAddr, ip: IpAddr,
) -> Result<(AuthorizedRequest, OwnedSemaphorePermit), FrontendErrorResponse> { ) -> Result<AuthorizedRequest, FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this // TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator // TODO: move this to an AuthorizedUser extrator
let (ip, semaphore) = match app.rate_limit_login(ip).await? { let (ip, _semaphore) = match app.rate_limit_login(ip).await? {
RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore), RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore),
RateLimitResult::RateLimitedIp(x, retry_at) => { RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
@ -274,7 +274,7 @@ pub async fn login_is_authorized(
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
}; };
Ok((AuthorizedRequest::Ip(ip, None), semaphore)) Ok(AuthorizedRequest::Ip(ip, None))
} }
// TODO: where should we use this? // TODO: where should we use this?
@ -315,7 +315,7 @@ pub async fn ip_is_authorized(
// TODO: i think we could write an `impl From` for this // TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator // TODO: move this to an AuthorizedUser extrator
let (ip, semaphore) = match app.rate_limit_by_ip(ip, origin.as_ref()).await? { let (ip, semaphore) = match app.rate_limit_by_ip(ip, origin.as_ref()).await? {
RateLimitResult::AllowedIp(ip, semaphore) => (ip, Some(semaphore)), RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore),
RateLimitResult::RateLimitedIp(x, retry_at) => { RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
} }
@ -354,24 +354,28 @@ pub async fn key_is_authorized(
} }
impl Web3ProxyApp { impl Web3ProxyApp {
pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<OwnedSemaphorePermit> { pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
let semaphore = self if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
.ip_semaphores let semaphore = self
.get_with(ip, async move { .ip_semaphores
// TODO: get semaphore size from app config .get_with(ip, async move {
let s = Semaphore::const_new(10); // TODO: set max_concurrent_requests dynamically based on load?
Arc::new(s) let s = Semaphore::const_new(max_concurrent_requests);
}) Arc::new(s)
.await; })
.await;
// if semaphore.available_permits() == 0 { // if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? less important for anon users // // TODO: concurrent limit hit! emit a stat? less important for anon users
// // TODO: there is probably a race here // // TODO: there is probably a race here
// } // }
let semaphore_permit = semaphore.acquire_owned().await?; let semaphore_permit = semaphore.acquire_owned().await?;
Ok(semaphore_permit) Ok(Some(semaphore_permit))
} else {
Ok(None)
}
} }
pub async fn user_key_semaphore( pub async fn user_key_semaphore(
@ -404,13 +408,10 @@ impl Web3ProxyApp {
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key // TODO: dry this up with rate_limit_by_key
let semaphore = self.ip_semaphore(ip).await?; // TODO: do we ant semafores here?
if let Some(rate_limiter) = &self.login_rate_limiter { if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => { Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip, None)),
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => { Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => {
// TODO: set headers so they know when they can retry // TODO: set headers so they know when they can retry
// TODO: debug or trace? // TODO: debug or trace?
@ -428,7 +429,7 @@ impl Web3ProxyApp {
// TODO: i really want axum to do this for us in a single place. // TODO: i really want axum to do this for us in a single place.
error!(?err, "login rate limiter is unhappy. allowing ip"); error!(?err, "login rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedIp(ip, semaphore)) Ok(RateLimitResult::AllowedIp(ip, None))
} }
} }
} else { } else {

View File

@ -49,8 +49,7 @@ pub async fn user_login_get(
// TODO: allow ENS names here? // TODO: allow ENS names here?
Path(mut params): Path<HashMap<String, String>>, Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult { ) -> FrontendResult {
// give these named variables so that we drop them at the very end of this function login_is_authorized(&app, ip).await?;
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
// at first i thought about checking that user_address is in our db // at first i thought about checking that user_address is in our db
// but theres no need to separate the registration and login flows // but theres no need to separate the registration and login flows
@ -162,8 +161,7 @@ pub async fn user_login_post(
Json(payload): Json<PostLogin>, Json(payload): Json<PostLogin>,
Query(query): Query<PostLoginQuery>, Query(query): Query<PostLoginQuery>,
) -> FrontendResult { ) -> FrontendResult {
// give these named variables so that we drop them at the very end of this function login_is_authorized(&app, ip).await?;
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
if let Some(invite_code) = &app.config.invite_code { if let Some(invite_code) = &app.config.invite_code {
// we don't do per-user referral codes because we shouldn't collect what we don't need. // we don't do per-user referral codes because we shouldn't collect what we don't need.
@ -371,8 +369,7 @@ pub async fn user_profile_post(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
Json(payload): Json<PostUser>, Json(payload): Json<PostUser>,
) -> FrontendResult { ) -> FrontendResult {
// give these named variables so that we drop them at the very end of this function login_is_authorized(&app, ip).await?;
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
let user = ProtectedAction::PostUser(payload.primary_address) let user = ProtectedAction::PostUser(payload.primary_address)
.verify(app.as_ref(), bearer_token) .verify(app.as_ref(), bearer_token)