From 53f9551180ac89134780753132a44cea0a7d16c5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 28 Sep 2022 03:35:55 +0000 Subject: [PATCH] finish adding semaphores --- Cargo.lock | 23 +--- TODO.md | 5 +- deferred-rate-limiter/Cargo.toml | 2 +- entities/Cargo.toml | 2 +- entities/src/user_keys.rs | 1 + migration/Cargo.toml | 2 +- migration/src/lib.rs | 2 + migration/src/m20220921_181610_log_reverts.rs | 7 +- redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 3 +- web3_proxy/src/app.rs | 6 +- web3_proxy/src/frontend/authorization.rs | 119 +++++++++--------- web3_proxy/src/frontend/users.rs | 18 +-- 13 files changed, 89 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8384a809..c90a3d8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,19 +1197,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "dashmap" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" -dependencies = [ - "cfg-if", - "hashbrown", - "lock_api", - "once_cell", - "parking_lot_core 0.9.3", -] - [[package]] name = "deadpool" version = "0.9.5" @@ -1457,7 +1444,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.2.0" +version = "0.3.0" dependencies = [ "sea-orm", "serde", @@ -2770,7 +2757,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-std", "sea-orm-migration", @@ -4948,9 +4935,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.1" +version = "1.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0020c875007ad96677dcc890298f4b942882c5d4eb7cc8f439fc3bf813dc9c95" +checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" dependencies = [ "autocfg 1.1.0", "bytes", @@ -4958,7 +4945,6 @@ dependencies = [ "memchr", "mio", "num_cpus", - "once_cell", "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", @@ -5528,7 +5514,6 @@ dependencies = [ "axum-macros", "chrono", "counter", - "dashmap", "deferred-rate-limiter", "derive_more", "dotenv", diff --git a/TODO.md b/TODO.md index 12a0a5e2..013dabe0 100644 --- a/TODO.md +++ b/TODO.md @@ -172,6 +172,7 @@ These are roughly in order of completition - - since users are actively using our service, we will need to support both - [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet. +- [x] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of Arcs. if the request timeout is exceeded, then we can return a rate limit exceeded error - [ ] active requests per second per api key - [ ] parallel requests per api key - [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.) @@ -179,6 +180,8 @@ These are roughly in order of completition - this must be opt-in or spawned since it will slow things down and will make their calls less private - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it - [-] add configurable size limits to all the Caches + - [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages + - [ ] if user cache has evictions that aren't from timeouts, log a warning - [ ] endpoint for creating/modifying api keys and their advanced security features - [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down. - [ ] option to rotate api key @@ -229,8 +232,6 @@ These are not yet ordered. - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header - if total difficulty is set and non-zero, use it for consensus instead of just the number - [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit -- [ ] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of Arcs. if the request timeout is exceeded, then we can return a rate limit exceeded error - new endpoints for users (not totally sure about the exact paths, but these features are all needed): - [x] GET /u/:api_key diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index 9acdac5f..fb1f9c94 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -10,5 +10,5 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.65" hashbrown = "0.12.3" moka = { version = "0.9.4", default-features = false, features = ["future"] } -tokio = "1.21.1" +tokio = "1.21.2" tracing = "0.1.36" diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 17ec4d19..5e1da31f 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.2.0" +version = "0.3.0" edition = "2021" [lib] diff --git a/entities/src/user_keys.rs b/entities/src/user_keys.rs index 7d823a4e..81e95552 100644 --- a/entities/src/user_keys.rs +++ b/entities/src/user_keys.rs @@ -16,6 +16,7 @@ pub struct Model { pub private_txs: bool, pub active: bool, pub requests_per_minute: Option, + pub max_concurrent_requests: Option, #[sea_orm(column_type = "Decimal(Some((5, 4)))")] pub log_revert_chance: Decimal, #[sea_orm(column_type = "Text", nullable)] diff --git a/migration/Cargo.toml b/migration/Cargo.toml index ee937f96..535281a2 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.2.0" +version = "0.3.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 6780b8e9..d5430fda 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -2,6 +2,7 @@ pub use sea_orm_migration::prelude::*; mod m20220101_000001_create_table; mod m20220921_181610_log_reverts; +mod m20220928_015108_concurrency_limits; pub struct Migrator; @@ -11,6 +12,7 @@ impl MigratorTrait for Migrator { vec![ Box::new(m20220101_000001_create_table::Migration), Box::new(m20220921_181610_log_reverts::Migration), + Box::new(m20220928_015108_concurrency_limits::Migration), ] } } diff --git a/migration/src/m20220921_181610_log_reverts.rs b/migration/src/m20220921_181610_log_reverts.rs index 12b5e89e..26282c85 100644 --- a/migration/src/m20220921_181610_log_reverts.rs +++ b/migration/src/m20220921_181610_log_reverts.rs @@ -84,20 +84,23 @@ impl MigrationTrait for Migration { .alter_table( sea_query::Table::alter() .table(UserKeys::Table) - .to_owned() .modify_column( ColumnDef::new(UserKeys::RequestsPerMinute) .unsigned() .not_null(), ) .drop_column(UserKeys::LogRevertChance) + .drop_column(UserKeys::AllowedIps) + .drop_column(UserKeys::AllowedOrigins) + .drop_column(UserKeys::AllowedReferers) + .drop_column(UserKeys::AllowedUserAgents) .to_owned(), ) .await } } -// copied from create_table.rs, but added +// copied from create_table.rs, but added new columns #[derive(Iden)] pub enum UserKeys { Table, diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index cb580f6a..58e3da52 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -8,4 +8,4 @@ edition = "2021" anyhow = "1.0.65" deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] } tracing = "0.1.36" -tokio = "1.21.1" +tokio = "1.21.2" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 979a7ac5..a6ef3acd 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -28,7 +28,6 @@ axum-macros = "0.2.3" # TODO: import chrono from sea-orm so we always have the same version chrono = "0.4.22" counter = "0.5.6" -dashmap = "5.4.0" derive_more = "0.99.17" dotenv = "0.15.0" ethers = { version = "0.17.0", features = ["rustls", "ws"] } @@ -60,7 +59,7 @@ serde_json = { version = "1.0.85", default-features = false, features = ["alloc" serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude time = "0.3.14" -tokio = { version = "1.21.1", features = ["full", "tracing"] } +tokio = { version = "1.21.2", features = ["full", "tracing"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude tokio-stream = { version = "0.1.10", features = ["sync"] } toml = "0.5.9" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 1800c103..e1c198a6 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -68,8 +68,10 @@ pub type AnyhowJoinHandle = JoinHandle>; /// TODO: rename this? pub struct UserKeyData { pub user_key_id: u64, - /// if u64::MAX, allow unlimited queries - pub user_max_requests_per_period: Option, + /// if None, allow unlimited queries + pub max_requests_per_period: Option, + // if None, allow unlimited concurrent requests + pub max_concurrent_requests: Option, /// if None, allow any Origin pub allowed_origins: Option>, /// if None, allow any Referer diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index bcb7323e..1be2bc74 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -11,7 +11,7 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu use serde::Serialize; use std::fmt::Display; use std::{net::IpAddr, str::FromStr, sync::Arc}; -use tokio::sync::Semaphore; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::{error, trace}; use ulid::Ulid; @@ -92,9 +92,10 @@ impl From for Uuid { #[derive(Debug)] pub enum RateLimitResult { /// contains the IP of the anonymous user - AllowedIp(IpAddr, Arc), + /// TODO: option inside or outside the arc? + AllowedIp(IpAddr, OwnedSemaphorePermit), /// contains the user_key_id of an authenticated user - AllowedUser(UserKeyData, Arc), + AllowedUser(UserKeyData, Option), /// contains the IP and retry_at of the anonymous user RateLimitedIp(IpAddr, Option), /// contains the user_key_id and retry_at of an authenticated user key @@ -203,7 +204,7 @@ impl AuthorizedRequest { pub async fn login_is_authorized( app: &Web3ProxyApp, ip: IpAddr, -) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { +) -> Result<(AuthorizedRequest, OwnedSemaphorePermit), FrontendErrorResponse> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (ip, semaphore) = match app.rate_limit_login(ip).await? { @@ -225,7 +226,7 @@ pub async fn bearer_is_authorized( origin: Option, referer: Option, user_agent: Option, -) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { +) -> Result<(AuthorizedRequest, Option), FrontendErrorResponse> { let mut redis_conn = app.redis_conn().await.context("Getting redis connection")?; // TODO: verify that bearer.token() is a Ulid? @@ -260,11 +261,11 @@ pub async fn bearer_is_authorized( pub async fn ip_is_authorized( app: &Web3ProxyApp, ip: IpAddr, -) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { +) -> Result<(AuthorizedRequest, Option), FrontendErrorResponse> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (ip, semaphore) = match app.rate_limit_by_ip(ip).await? { - RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore), + RateLimitResult::AllowedIp(ip, semaphore) => (ip, Some(semaphore)), RateLimitResult::RateLimitedIp(x, retry_at) => { return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); } @@ -272,6 +273,7 @@ pub async fn ip_is_authorized( x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; + // semaphore won't ever be None, but its easier if key auth and ip auth work the same way Ok((AuthorizedRequest::Ip(ip), semaphore)) } @@ -282,7 +284,7 @@ pub async fn key_is_authorized( origin: Option, referer: Option, user_agent: Option, -) -> Result<(AuthorizedRequest, Arc), FrontendErrorResponse> { +) -> Result<(AuthorizedRequest, Option), FrontendErrorResponse> { // check the rate limits. error if over the limit let (user_data, semaphore) = match app.rate_limit_by_key(user_key).await? { RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore), @@ -302,20 +304,50 @@ pub async fn key_is_authorized( } impl Web3ProxyApp { + pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result { + let semaphore = self + .ip_semaphores + .get_with(ip, async move { + // TODO: get semaphore size from app config + let s = Semaphore::const_new(10); + Arc::new(s) + }) + .await; + + let semaphore_permit = semaphore.acquire_owned().await?; + + Ok(semaphore_permit) + } + + pub async fn user_key_semaphore( + &self, + user_data: &UserKeyData, + ) -> anyhow::Result> { + if let Some(max_concurrent_requests) = user_data.max_concurrent_requests { + let semaphore = self + .user_key_semaphores + .get_with(user_data.user_key_id, async move { + let s = Semaphore::const_new(max_concurrent_requests.try_into().unwrap()); + trace!("new semaphore for user_key_id {}", user_data.user_key_id); + Arc::new(s) + }) + .await; + + let semaphore_permit = semaphore.acquire_owned().await?; + + Ok(Some(semaphore_permit)) + } else { + Ok(None) + } + } + pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key - // TODO: have a local cache because if we hit redis too hard we get errors - // TODO: query redis in the background so that users don't have to wait on this network request + let semaphore = self.ip_semaphore(ip).await?; + if let Some(rate_limiter) = &self.login_rate_limiter { match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { Ok(RedisRateLimitResult::Allowed(_)) => { - let semaphore = self - .ip_semaphores - .get_with(ip, async move { - todo!("write this (dry)"); - }) - .await; - Ok(RateLimitResult::AllowedIp(ip, semaphore)) } Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => { @@ -335,13 +367,6 @@ impl Web3ProxyApp { // TODO: i really want axum to do this for us in a single place. error!(?err, "login rate limiter is unhappy. allowing ip"); - let semaphore = self - .ip_semaphores - .get_with(ip, async move { - todo!("write this (dry)"); - }) - .await; - Ok(RateLimitResult::AllowedIp(ip, semaphore)) } } @@ -353,18 +378,11 @@ impl Web3ProxyApp { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key - // TODO: have a local cache because if we hit redis too hard we get errors - // TODO: query redis in the background so that users don't have to wait on this network request + let semaphore = self.ip_semaphore(ip).await?; + if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { match rate_limiter.throttle(ip, None, 1).await { Ok(DeferredRateLimitResult::Allowed) => { - let semaphore = self - .ip_semaphores - .get_with(ip, async move { - todo!("write this (dry)"); - }) - .await; - Ok(RateLimitResult::AllowedIp(ip, semaphore)) } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { @@ -384,13 +402,6 @@ impl Web3ProxyApp { // TODO: i really want axum to do this for us in a single place. error!(?err, "rate limiter is unhappy. allowing ip"); - let semaphore = self - .ip_semaphores - .get_with(ip, async move { - todo!("write this (dry)"); - }) - .await; - Ok(RateLimitResult::AllowedIp(ip, semaphore)) } } @@ -467,7 +478,8 @@ impl Web3ProxyApp { Ok(UserKeyData { user_key_id: user_key_model.id, - user_max_requests_per_period: user_key_model.requests_per_minute, + max_requests_per_period: user_key_model.requests_per_minute, + max_concurrent_requests: user_key_model.max_concurrent_requests, allowed_ips, allowed_origins, allowed_referers, @@ -491,15 +503,10 @@ impl Web3ProxyApp { return Ok(RateLimitResult::UnknownKey); } - let user_max_requests_per_period = match user_data.user_max_requests_per_period { - None => { - let semaphore = self - .user_key_semaphores - .get_with(user_data.user_key_id, async move { - todo!("write this"); - }) - .await; + let semaphore = self.user_key_semaphore(&user_data).await?; + let user_max_requests_per_period = match user_data.max_requests_per_period { + None => { return Ok(RateLimitResult::AllowedUser(user_data, semaphore)); } Some(x) => x, @@ -512,13 +519,6 @@ impl Web3ProxyApp { .await { Ok(DeferredRateLimitResult::Allowed) => { - let semaphore = self - .user_key_semaphores - .get_with(user_data.user_key_id, async move { - todo!("write this (dry)"); - }) - .await; - Ok(RateLimitResult::AllowedUser(user_data, semaphore)) } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { @@ -539,13 +539,6 @@ impl Web3ProxyApp { // TODO: i really want axum to do this for us in a single place. error!(?err, "rate limiter is unhappy. allowing ip"); - let semaphore = self - .user_key_semaphores - .get_with(user_data.user_key_id, async move { - todo!("write this (dry)"); - }) - .await; - Ok(RateLimitResult::AllowedUser(user_data, semaphore)) } } diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 49b294a6..58d50c70 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -41,7 +41,8 @@ pub async fn get_login( // TODO: allow ENS names here? Path(mut params): Path>, ) -> FrontendResult { - let _ = login_is_authorized(&app, ip).await?; + // give these named variables so that we drop them at the very end of this function + let (_, _semaphore) = login_is_authorized(&app, ip).await?; // at first i thought about checking that user_address is in our db // but theres no need to separate the registration and login flows @@ -145,7 +146,8 @@ pub async fn post_login( Json(payload): Json, Query(query): Query, ) -> FrontendResult { - let _ = login_is_authorized(&app, ip).await?; + // give these named variables so that we drop them at the very end of this function + let (_, _semaphore) = login_is_authorized(&app, ip).await?; if let Some(invite_code) = &app.config.invite_code { // we don't do per-user referral codes because we shouldn't collect what we don't need. @@ -293,7 +295,8 @@ pub async fn post_user( Extension(app): Extension>, Json(payload): Json, ) -> FrontendResult { - let _ = login_is_authorized(&app, ip).await?; + // give these named variables so that we drop them at the very end of this function + let (_, _semaphore) = login_is_authorized(&app, ip).await?; let user = ProtectedAction::PostUser .verify(app.as_ref(), bearer_token, &payload.primary_address) @@ -303,7 +306,10 @@ pub async fn post_user( // TODO: rate limit by user, too? + // TODO: allow changing the primary address, too. require a message from the new address to finish the change + if let Some(x) = payload.email { + // TODO: only Set if no change if x.is_empty() { user.email = sea_orm::Set(None); } else { @@ -315,12 +321,6 @@ pub async fn post_user( user.save(db).await?; - // let user = user::ActiveModel { - // address: sea_orm::Set(payload.address.to_fixed_bytes().into()), - // email: sea_orm::Set(payload.email), - // ..Default::default() - // }; - todo!("finish post_user"); }