From 0c7a2b05ec975ac35862aa41852955e10498773c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 27 Sep 2022 02:01:45 +0000 Subject: [PATCH] start adding semaphores --- TODO.md | 4 +- web3_proxy/src/app.rs | 20 +++- web3_proxy/src/frontend/authorization.rs | 107 +++++++++++++++++----- web3_proxy/src/frontend/rpc_proxy_http.rs | 5 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 5 +- web3_proxy/src/frontend/status.rs | 6 +- 6 files changed, 111 insertions(+), 36 deletions(-) diff --git a/TODO.md b/TODO.md index b0f27cd2..12a0a5e2 100644 --- a/TODO.md +++ b/TODO.md @@ -182,7 +182,6 @@ These are roughly in order of completition - [ ] endpoint for creating/modifying api keys and their advanced security features - [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down. - [ ] option to rotate api key -- [ ] read the cookie key from a file. easy to re-use and no giant blob of hex in our app config - [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized - [ ] user create script should allow a description field - [ ] user create script should allow multiple keys per user @@ -191,6 +190,7 @@ These are roughly in order of completition These are not yet ordered. +- [ ] implement filters and other unimplemented rpc methods - [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json - [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge - [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy) @@ -229,7 +229,7 @@ These are not yet ordered. - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header - if total difficulty is set and non-zero, use it for consensus instead of just the number - [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit -- [ ] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of semaphores. if the request timeout is exceeded, then we can return a rate limit exceeded error +- [ ] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of Arcs. if the request timeout is exceeded, then we can return a rate limit exceeded error new endpoints for users (not totally sure about the exact paths, but these features are all needed): diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index dcc68c5f..1800c103 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -41,7 +41,7 @@ use std::str::FromStr; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, watch}; +use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; @@ -106,7 +106,9 @@ pub struct Web3ProxyApp { pub frontend_key_rate_limiter: Option>, pub login_rate_limiter: Option, pub redis_pool: Option, - pub user_cache: Cache, + pub user_key_cache: Cache, + pub user_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, } /// flatten a JoinError into an anyhow error @@ -380,11 +382,19 @@ impl Web3ProxyApp { // all the users are the same size, so no need for a weigher // TODO: max_capacity from config // TODO: ttl from config - let user_cache = Cache::builder() + let user_key_cache = Cache::builder() .max_capacity(10_000) .time_to_live(Duration::from_secs(60)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + // TODO: what should tti be for semaphores? + let user_key_semaphores = Cache::builder() + .time_to_idle(Duration::from_secs(120)) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + let ip_semaphores = Cache::builder() + .time_to_idle(Duration::from_secs(120)) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + let app = Self { config: top_config.app, balanced_rpcs, @@ -400,7 +410,9 @@ impl Web3ProxyApp { redis_pool, app_metrics, open_request_handle_metrics, - user_cache, + user_key_cache, + user_key_semaphores, + ip_semaphores, }; let app = Arc::new(app); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 57d87ce1..bcb7323e 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -11,6 +11,7 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu use serde::Serialize; use std::fmt::Display; use std::{net::IpAddr, str::FromStr, sync::Arc}; +use tokio::sync::Semaphore; use tokio::time::Instant; use tracing::{error, trace}; use ulid::Ulid; @@ -91,9 +92,9 @@ impl From for Uuid { #[derive(Debug)] pub enum RateLimitResult { /// contains the IP of the anonymous user - AllowedIp(IpAddr), + AllowedIp(IpAddr, Arc), /// contains the user_key_id of an authenticated user - AllowedUser(UserKeyData), + AllowedUser(UserKeyData, Arc), /// contains the IP and retry_at of the anonymous user RateLimitedIp(IpAddr, Option), /// contains the user_key_id and retry_at of an authenticated user key @@ -202,11 +203,11 @@ impl AuthorizedRequest { pub async fn login_is_authorized( app: &Web3ProxyApp, ip: IpAddr, -) -> Result { +) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator - let ip = match app.rate_limit_login(ip).await? { - RateLimitResult::AllowedIp(x) => x, + let (ip, semaphore) = match app.rate_limit_login(ip).await? { + RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore), RateLimitResult::RateLimitedIp(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); } @@ -214,7 +215,7 @@ pub async fn login_is_authorized( x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), }; - Ok(AuthorizedRequest::Ip(ip)) + Ok((AuthorizedRequest::Ip(ip), semaphore)) } pub async fn bearer_is_authorized( @@ -224,7 +225,7 @@ pub async fn bearer_is_authorized( origin: Option, referer: Option, user_agent: Option, -) -> Result { +) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { let mut redis_conn = app.redis_conn().await.context("Getting redis connection")?; // TODO: verify that bearer.token() is a Ulid? @@ -259,11 +260,11 @@ pub async fn bearer_is_authorized( pub async fn ip_is_authorized( app: &Web3ProxyApp, ip: IpAddr, -) -> Result { +) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator - let ip = match app.rate_limit_by_ip(ip).await? { - RateLimitResult::AllowedIp(x) => x, + let (ip, semaphore) = match app.rate_limit_by_ip(ip).await? { + RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore), RateLimitResult::RateLimitedIp(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); } @@ -271,7 +272,7 @@ pub async fn ip_is_authorized( x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; - Ok(AuthorizedRequest::Ip(ip)) + Ok((AuthorizedRequest::Ip(ip), semaphore)) } pub async fn key_is_authorized( @@ -281,10 +282,10 @@ pub async fn key_is_authorized( origin: Option, referer: Option, user_agent: Option, -) -> Result { +) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { // check the rate limits. error if over the limit - let user_data = match app.rate_limit_by_key(user_key).await? { - RateLimitResult::AllowedUser(x) => x, + let (user_data, semaphore) = match app.rate_limit_by_key(user_key).await? { + RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore), RateLimitResult::RateLimitedUser(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedUser(x, retry_at)); } @@ -297,7 +298,7 @@ pub async fn key_is_authorized( let db = app.db_conn.clone(); - Ok(AuthorizedRequest::User(db, authorized_user)) + Ok((AuthorizedRequest::User(db, authorized_user), semaphore)) } impl Web3ProxyApp { @@ -307,7 +308,16 @@ impl Web3ProxyApp { // TODO: query redis in the background so that users don't have to wait on this network request if let Some(rate_limiter) = &self.login_rate_limiter { match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { - Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip)), + Ok(RedisRateLimitResult::Allowed(_)) => { + let semaphore = self + .ip_semaphores + .get_with(ip, async move { + todo!("write this (dry)"); + }) + .await; + + Ok(RateLimitResult::AllowedIp(ip, semaphore)) + } Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? @@ -324,7 +334,15 @@ impl Web3ProxyApp { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. error!(?err, "login rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedIp(ip)) + + let semaphore = self + .ip_semaphores + .get_with(ip, async move { + todo!("write this (dry)"); + }) + .await; + + Ok(RateLimitResult::AllowedIp(ip, semaphore)) } } } else { @@ -339,7 +357,16 @@ impl Web3ProxyApp { // TODO: query redis in the background so that users don't have to wait on this network request if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { match rate_limiter.throttle(ip, None, 1).await { - Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)), + Ok(DeferredRateLimitResult::Allowed) => { + let semaphore = self + .ip_semaphores + .get_with(ip, async move { + todo!("write this (dry)"); + }) + .await; + + Ok(RateLimitResult::AllowedIp(ip, semaphore)) + } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? @@ -356,7 +383,15 @@ impl Web3ProxyApp { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedIp(ip)) + + let semaphore = self + .ip_semaphores + .get_with(ip, async move { + todo!("write this (dry)"); + }) + .await; + + Ok(RateLimitResult::AllowedIp(ip, semaphore)) } } } else { @@ -368,7 +403,7 @@ impl Web3ProxyApp { // check the local cache for user data, or query the database pub(crate) async fn user_data(&self, user_key: UserKey) -> anyhow::Result { let user_data: Result<_, Arc> = self - .user_cache + .user_key_cache .try_get_with(user_key.into(), async move { trace!(?user_key, "user_cache miss"); @@ -457,7 +492,16 @@ impl Web3ProxyApp { } let user_max_requests_per_period = match user_data.user_max_requests_per_period { - None => return Ok(RateLimitResult::AllowedUser(user_data)), + None => { + let semaphore = self + .user_key_semaphores + .get_with(user_data.user_key_id, async move { + todo!("write this"); + }) + .await; + + return Ok(RateLimitResult::AllowedUser(user_data, semaphore)); + } Some(x) => x, }; @@ -467,7 +511,16 @@ impl Web3ProxyApp { .throttle(user_key.into(), Some(user_max_requests_per_period), 1) .await { - Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data)), + Ok(DeferredRateLimitResult::Allowed) => { + let semaphore = self + .user_key_semaphores + .get_with(user_data.user_key_id, async move { + todo!("write this (dry)"); + }) + .await; + + Ok(RateLimitResult::AllowedUser(user_data, semaphore)) + } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? @@ -485,7 +538,15 @@ impl Web3ProxyApp { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedUser(user_data)) + + let semaphore = self + .user_key_semaphores + .get_with(user_data.user_key_id, async move { + todo!("write this (dry)"); + }) + .await; + + Ok(RateLimitResult::AllowedUser(user_data, semaphore)) } } } else { diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index f0b3fdf6..a8d84a66 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -21,7 +21,8 @@ pub async fn proxy_web3_rpc( ) -> FrontendResult { let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer { + let (authorized_request, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer + { let origin = origin.map(|x| x.0); let referer = referer.map(|x| x.0); let user_agent = user_agent.map(|x| x.0); @@ -64,7 +65,7 @@ pub async fn proxy_web3_rpc_with_key( let request_span = error_span!("request", %ip, ?referer, ?user_agent); // TODO: this should probably return the user_key_id instead? or maybe both? - let authorized_request = key_is_authorized( + let (authorized_request, _semaphore) = key_is_authorized( &app, user_key, ip, diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index b048cabd..db2fc343 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -40,7 +40,8 @@ pub async fn websocket_handler( ) -> FrontendResult { let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer { + let (authorized_request, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer + { let origin = origin.map(|x| x.0); let referer = referer.map(|x| x.0); let user_agent = user_agent.map(|x| x.0); @@ -85,7 +86,7 @@ pub async fn websocket_handler_with_key( let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let authorized_request = key_is_authorized( + let (authorized_request, _semaphore) = key_is_authorized( &app, user_key, ip, diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 36322ab2..dc3f1c09 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -24,14 +24,14 @@ pub async fn prometheus(Extension(app): Extension>) -> impl In /// TODO: replace this with proper stats and monitoring pub async fn status(Extension(app): Extension>) -> impl IntoResponse { app.pending_transactions.sync(); - app.user_cache.sync(); + app.user_key_cache.sync(); // TODO: what else should we include? uptime, cache hit rates, cpu load let body = json!({ "pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_size": app.pending_transactions.weighted_size(), - "user_cache_count": app.user_cache.entry_count(), - "user_cache_size": app.user_cache.weighted_size(), + "user_cache_count": app.user_key_cache.entry_count(), + "user_cache_size": app.user_key_cache.weighted_size(), "balanced_rpcs": app.balanced_rpcs, "private_rpcs": app.private_rpcs, });