From 618bfeb861eed93ca5fcabfc79e6a8a1c343933a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 25 Oct 2022 04:01:41 +0000 Subject: [PATCH] semafore cleanup --- TODO.md | 3 +- web3_proxy/src/config.rs | 13 ++++++ web3_proxy/src/frontend/authorization.rs | 53 ++++++++++++------------ web3_proxy/src/frontend/users.rs | 9 ++-- 4 files changed, 44 insertions(+), 34 deletions(-) diff --git a/TODO.md b/TODO.md index 7c907315..eafea965 100644 --- a/TODO.md +++ b/TODO.md @@ -192,7 +192,7 @@ These are roughly in order of completition - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [x] don't use unix timestamps for response_millis since leap seconds will confuse it - [x] config to allow origins even on the anonymous endpoints -- [ ] log errors to sentry +- [x] send logs to sentry - [-] ability to domain lock or ip lock said key - the code to check the database and use these entries already exists, but users don't have a way to set them - [-] new endpoints for users (not totally sure about the exact paths, but these features are all needed): @@ -353,7 +353,6 @@ in another repo: event subscriber - [ ] test /api/getGaugesmethod - usually times out after vercel's 60 second timeout - one time got: Error invalid Json response "" -- [ ] send logs to sentry - [ ] i think all the async methods in ethers need tracing instrument. something like `cfgif(tracing, tracing::instrument)` - if they do that, i think my request_id will show up on their logs - [ ] page that prints a graphviz dotfile of the blockchain diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 57106449..f986b4c1 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -92,6 +92,12 @@ pub struct AppConfig { #[serde(default = "default_public_requests_per_minute")] pub public_requests_per_minute: Option, + /// Concurrent request limit for anonymous users. + /// Some(0) = block all requests + /// None = allow all requests + #[serde(default = "default_public_max_concurrent_requests")] + pub public_max_concurrent_requests: Option, + /// Rate limit for the login entrypoint. /// This is separate from the rpc limits. #[serde(default = "default_login_rate_limit_per_minute")] @@ -129,6 +135,13 @@ fn default_min_synced_rpcs() -> usize { 1 } +/// 0 blocks anonymous requests. +/// None allows unlimited concurrent requests +// TODO: what is a reasonable default? +fn default_public_max_concurrent_requests() -> Option { + Some(5) +} + /// 0 blocks anonymous requests by default. fn default_public_requests_per_minute() -> Option { Some(0) diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index baa7769f..6a8a2a70 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -35,7 +35,7 @@ pub enum UserKey { pub enum RateLimitResult { /// contains the IP of the anonymous user /// TODO: option inside or outside the arc? - AllowedIp(IpAddr, OwnedSemaphorePermit), + AllowedIp(IpAddr, Option), /// contains the user_key_id of an authenticated user AllowedUser(UserKeyData, Option), /// contains the IP and retry_at of the anonymous user @@ -262,10 +262,10 @@ impl Display for &AuthorizedRequest { pub async fn login_is_authorized( app: &Web3ProxyApp, ip: IpAddr, -) -> Result<(AuthorizedRequest, OwnedSemaphorePermit), FrontendErrorResponse> { +) -> Result { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator - let (ip, semaphore) = match app.rate_limit_login(ip).await? { + let (ip, _semaphore) = match app.rate_limit_login(ip).await? { RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore), RateLimitResult::RateLimitedIp(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); @@ -274,7 +274,7 @@ pub async fn login_is_authorized( x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), }; - Ok((AuthorizedRequest::Ip(ip, None), semaphore)) + Ok(AuthorizedRequest::Ip(ip, None)) } // TODO: where should we use this? @@ -315,7 +315,7 @@ pub async fn ip_is_authorized( // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (ip, semaphore) = match app.rate_limit_by_ip(ip, origin.as_ref()).await? { - RateLimitResult::AllowedIp(ip, semaphore) => (ip, Some(semaphore)), + RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore), RateLimitResult::RateLimitedIp(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); } @@ -354,24 +354,28 @@ pub async fn key_is_authorized( } impl Web3ProxyApp { - pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result { - let semaphore = self - .ip_semaphores - .get_with(ip, async move { - // TODO: get semaphore size from app config - let s = Semaphore::const_new(10); - Arc::new(s) - }) - .await; + pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result> { + if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { + let semaphore = self + .ip_semaphores + .get_with(ip, async move { + // TODO: set max_concurrent_requests dynamically based on load? + let s = Semaphore::const_new(max_concurrent_requests); + Arc::new(s) + }) + .await; - // if semaphore.available_permits() == 0 { - // // TODO: concurrent limit hit! emit a stat? less important for anon users - // // TODO: there is probably a race here - // } + // if semaphore.available_permits() == 0 { + // // TODO: concurrent limit hit! emit a stat? less important for anon users + // // TODO: there is probably a race here + // } - let semaphore_permit = semaphore.acquire_owned().await?; + let semaphore_permit = semaphore.acquire_owned().await?; - Ok(semaphore_permit) + Ok(Some(semaphore_permit)) + } else { + Ok(None) + } } pub async fn user_key_semaphore( @@ -404,13 +408,10 @@ impl Web3ProxyApp { pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key - let semaphore = self.ip_semaphore(ip).await?; - + // TODO: do we ant semafores here? if let Some(rate_limiter) = &self.login_rate_limiter { match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { - Ok(RedisRateLimitResult::Allowed(_)) => { - Ok(RateLimitResult::AllowedIp(ip, semaphore)) - } + Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip, None)), Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? @@ -428,7 +429,7 @@ impl Web3ProxyApp { // TODO: i really want axum to do this for us in a single place. error!(?err, "login rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedIp(ip, semaphore)) + Ok(RateLimitResult::AllowedIp(ip, None)) } } } else { diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index fdf054e5..49b7e507 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -49,8 +49,7 @@ pub async fn user_login_get( // TODO: allow ENS names here? Path(mut params): Path>, ) -> FrontendResult { - // give these named variables so that we drop them at the very end of this function - let (_, _semaphore) = login_is_authorized(&app, ip).await?; + login_is_authorized(&app, ip).await?; // at first i thought about checking that user_address is in our db // but theres no need to separate the registration and login flows @@ -162,8 +161,7 @@ pub async fn user_login_post( Json(payload): Json, Query(query): Query, ) -> FrontendResult { - // give these named variables so that we drop them at the very end of this function - let (_, _semaphore) = login_is_authorized(&app, ip).await?; + login_is_authorized(&app, ip).await?; if let Some(invite_code) = &app.config.invite_code { // we don't do per-user referral codes because we shouldn't collect what we don't need. @@ -371,8 +369,7 @@ pub async fn user_profile_post( Extension(app): Extension>, Json(payload): Json, ) -> FrontendResult { - // give these named variables so that we drop them at the very end of this function - let (_, _semaphore) = login_is_authorized(&app, ip).await?; + login_is_authorized(&app, ip).await?; let user = ProtectedAction::PostUser(payload.primary_address) .verify(app.as_ref(), bearer_token)