finish adding semaphores

This commit is contained in:
Bryan Stitt 2022-09-28 03:35:55 +00:00
parent 0c7a2b05ec
commit 53f9551180
13 changed files with 89 additions and 103 deletions

23
Cargo.lock generated

@ -1197,19 +1197,6 @@ dependencies = [
"cipher",
]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core 0.9.3",
]
[[package]]
name = "deadpool"
version = "0.9.5"
@ -1457,7 +1444,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"sea-orm",
"serde",
@ -2770,7 +2757,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"async-std",
"sea-orm-migration",
@ -4948,9 +4935,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.21.1"
version = "1.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0020c875007ad96677dcc890298f4b942882c5d4eb7cc8f439fc3bf813dc9c95"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
"autocfg 1.1.0",
"bytes",
@ -4958,7 +4945,6 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
@ -5528,7 +5514,6 @@ dependencies = [
"axum-macros",
"chrono",
"counter",
"dashmap",
"deferred-rate-limiter",
"derive_more",
"dotenv",

@ -172,6 +172,7 @@ These are roughly in order of completition
- <https://discord.com/channels/873880840487206962/900758376164757555/1012942974608474142>
- since users are actively using our service, we will need to support both
- [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet.
- [x] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of Arc<tokio::sync::Semaphore>s. if the request timeout is exceeded, then we can return a rate limit exceeded error
- [ ] active requests per second per api key
- [ ] parallel requests per api key
- [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.)
@ -179,6 +180,8 @@ These are roughly in order of completition
- this must be opt-in or spawned since it will slow things down and will make their calls less private
- [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it
- [-] add configurable size limits to all the Caches
- [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages
- [ ] if user cache has evictions that aren't from timeouts, log a warning
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down.
- [ ] option to rotate api key
@ -229,8 +232,6 @@ These are not yet ordered.
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number
- [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit
- [ ] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of Arc<tokio::sync::Semaphore>s. if the request timeout is exceeded, then we can return a rate limit exceeded error
new endpoints for users (not totally sure about the exact paths, but these features are all needed):
- [x] GET /u/:api_key

@ -10,5 +10,5 @@ redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = "1.0.65"
hashbrown = "0.12.3"
moka = { version = "0.9.4", default-features = false, features = ["future"] }
tokio = "1.21.1"
tokio = "1.21.2"
tracing = "0.1.36"

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
[lib]

@ -16,6 +16,7 @@ pub struct Model {
pub private_txs: bool,
pub active: bool,
pub requests_per_minute: Option<u64>,
pub max_concurrent_requests: Option<u64>,
#[sea_orm(column_type = "Decimal(Some((5, 4)))")]
pub log_revert_chance: Decimal,
#[sea_orm(column_type = "Text", nullable)]

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
publish = false

@ -2,6 +2,7 @@ pub use sea_orm_migration::prelude::*;
mod m20220101_000001_create_table;
mod m20220921_181610_log_reverts;
mod m20220928_015108_concurrency_limits;
pub struct Migrator;
@ -11,6 +12,7 @@ impl MigratorTrait for Migrator {
vec![
Box::new(m20220101_000001_create_table::Migration),
Box::new(m20220921_181610_log_reverts::Migration),
Box::new(m20220928_015108_concurrency_limits::Migration),
]
}
}

@ -84,20 +84,23 @@ impl MigrationTrait for Migration {
.alter_table(
sea_query::Table::alter()
.table(UserKeys::Table)
.to_owned()
.modify_column(
ColumnDef::new(UserKeys::RequestsPerMinute)
.unsigned()
.not_null(),
)
.drop_column(UserKeys::LogRevertChance)
.drop_column(UserKeys::AllowedIps)
.drop_column(UserKeys::AllowedOrigins)
.drop_column(UserKeys::AllowedReferers)
.drop_column(UserKeys::AllowedUserAgents)
.to_owned(),
)
.await
}
}
// copied from create_table.rs, but added
// copied from create_table.rs, but added new columns
#[derive(Iden)]
pub enum UserKeys {
Table,

@ -8,4 +8,4 @@ edition = "2021"
anyhow = "1.0.65"
deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] }
tracing = "0.1.36"
tokio = "1.21.1"
tokio = "1.21.2"

@ -28,7 +28,6 @@ axum-macros = "0.2.3"
# TODO: import chrono from sea-orm so we always have the same version
chrono = "0.4.22"
counter = "0.5.6"
dashmap = "5.4.0"
derive_more = "0.99.17"
dotenv = "0.15.0"
ethers = { version = "0.17.0", features = ["rustls", "ws"] }
@ -60,7 +59,7 @@ serde_json = { version = "1.0.85", default-features = false, features = ["alloc"
serde_prometheus = "0.1.6"
# TODO: make sure this time version matches siwe. PR to put this in their prelude
time = "0.3.14"
tokio = { version = "1.21.1", features = ["full", "tracing"] }
tokio = { version = "1.21.2", features = ["full", "tracing"] }
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
tokio-stream = { version = "0.1.10", features = ["sync"] }
toml = "0.5.9"

@ -68,8 +68,10 @@ pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
/// TODO: rename this?
pub struct UserKeyData {
pub user_key_id: u64,
/// if u64::MAX, allow unlimited queries
pub user_max_requests_per_period: Option<u64>,
/// if None, allow unlimited queries
pub max_requests_per_period: Option<u64>,
// if None, allow unlimited concurrent requests
pub max_concurrent_requests: Option<u64>,
/// if None, allow any Origin
pub allowed_origins: Option<Vec<String>>,
/// if None, allow any Referer

@ -11,7 +11,7 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu
use serde::Serialize;
use std::fmt::Display;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::Semaphore;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::{error, trace};
use ulid::Ulid;
@ -92,9 +92,10 @@ impl From<UserKey> for Uuid {
#[derive(Debug)]
pub enum RateLimitResult {
/// contains the IP of the anonymous user
AllowedIp(IpAddr, Arc<Semaphore>),
/// TODO: option inside or outside the arc?
AllowedIp(IpAddr, OwnedSemaphorePermit),
/// contains the user_key_id of an authenticated user
AllowedUser(UserKeyData, Arc<Semaphore>),
AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>),
/// contains the IP and retry_at of the anonymous user
RateLimitedIp(IpAddr, Option<Instant>),
/// contains the user_key_id and retry_at of an authenticated user key
@ -203,7 +204,7 @@ impl AuthorizedRequest {
pub async fn login_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, OwnedSemaphorePermit), FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (ip, semaphore) = match app.rate_limit_login(ip).await? {
@ -225,7 +226,7 @@ pub async fn bearer_is_authorized(
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
let mut redis_conn = app.redis_conn().await.context("Getting redis connection")?;
// TODO: verify that bearer.token() is a Ulid?
@ -260,11 +261,11 @@ pub async fn bearer_is_authorized(
pub async fn ip_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (ip, semaphore) = match app.rate_limit_by_ip(ip).await? {
RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore),
RateLimitResult::AllowedIp(ip, semaphore) => (ip, Some(semaphore)),
RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
}
@ -272,6 +273,7 @@ pub async fn ip_is_authorized(
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
};
// semaphore won't ever be None, but its easier if key auth and ip auth work the same way
Ok((AuthorizedRequest::Ip(ip), semaphore))
}
@ -282,7 +284,7 @@ pub async fn key_is_authorized(
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// check the rate limits. error if over the limit
let (user_data, semaphore) = match app.rate_limit_by_key(user_key).await? {
RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore),
@ -302,20 +304,50 @@ pub async fn key_is_authorized(
}
impl Web3ProxyApp {
pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<OwnedSemaphorePermit> {
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
// TODO: get semaphore size from app config
let s = Semaphore::const_new(10);
Arc::new(s)
})
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
Ok(semaphore_permit)
}
pub async fn user_key_semaphore(
&self,
user_data: &UserKeyData,
) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = user_data.max_concurrent_requests {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
let s = Semaphore::const_new(max_concurrent_requests.try_into().unwrap());
trace!("new semaphore for user_key_id {}", user_data.user_key_id);
Arc::new(s)
})
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
Ok(Some(semaphore_permit))
} else {
Ok(None)
}
}
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors
// TODO: query redis in the background so that users don't have to wait on this network request
let semaphore = self.ip_semaphore(ip).await?;
if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => {
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => {
@ -335,13 +367,6 @@ impl Web3ProxyApp {
// TODO: i really want axum to do this for us in a single place.
error!(?err, "login rate limiter is unhappy. allowing ip");
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
}
@ -353,18 +378,11 @@ impl Web3ProxyApp {
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors
// TODO: query redis in the background so that users don't have to wait on this network request
let semaphore = self.ip_semaphore(ip).await?;
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
match rate_limiter.throttle(ip, None, 1).await {
Ok(DeferredRateLimitResult::Allowed) => {
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
@ -384,13 +402,6 @@ impl Web3ProxyApp {
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
}
@ -467,7 +478,8 @@ impl Web3ProxyApp {
Ok(UserKeyData {
user_key_id: user_key_model.id,
user_max_requests_per_period: user_key_model.requests_per_minute,
max_requests_per_period: user_key_model.requests_per_minute,
max_concurrent_requests: user_key_model.max_concurrent_requests,
allowed_ips,
allowed_origins,
allowed_referers,
@ -491,15 +503,10 @@ impl Web3ProxyApp {
return Ok(RateLimitResult::UnknownKey);
}
let user_max_requests_per_period = match user_data.user_max_requests_per_period {
None => {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
todo!("write this");
})
.await;
let semaphore = self.user_key_semaphore(&user_data).await?;
let user_max_requests_per_period = match user_data.max_requests_per_period {
None => {
return Ok(RateLimitResult::AllowedUser(user_data, semaphore));
}
Some(x) => x,
@ -512,13 +519,6 @@ impl Web3ProxyApp {
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
@ -539,13 +539,6 @@ impl Web3ProxyApp {
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
}
}

@ -41,7 +41,8 @@ pub async fn get_login(
// TODO: allow ENS names here?
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
let _ = login_is_authorized(&app, ip).await?;
// give these named variables so that we drop them at the very end of this function
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
// at first i thought about checking that user_address is in our db
// but theres no need to separate the registration and login flows
@ -145,7 +146,8 @@ pub async fn post_login(
Json(payload): Json<PostLogin>,
Query(query): Query<PostLoginQuery>,
) -> FrontendResult {
let _ = login_is_authorized(&app, ip).await?;
// give these named variables so that we drop them at the very end of this function
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
if let Some(invite_code) = &app.config.invite_code {
// we don't do per-user referral codes because we shouldn't collect what we don't need.
@ -293,7 +295,8 @@ pub async fn post_user(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Json(payload): Json<PostUser>,
) -> FrontendResult {
let _ = login_is_authorized(&app, ip).await?;
// give these named variables so that we drop them at the very end of this function
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
let user = ProtectedAction::PostUser
.verify(app.as_ref(), bearer_token, &payload.primary_address)
@ -303,7 +306,10 @@ pub async fn post_user(
// TODO: rate limit by user, too?
// TODO: allow changing the primary address, too. require a message from the new address to finish the change
if let Some(x) = payload.email {
// TODO: only Set if no change
if x.is_empty() {
user.email = sea_orm::Set(None);
} else {
@ -315,12 +321,6 @@ pub async fn post_user(
user.save(db).await?;
// let user = user::ActiveModel {
// address: sea_orm::Set(payload.address.to_fixed_bytes().into()),
// email: sea_orm::Set(payload.email),
// ..Default::default()
// };
todo!("finish post_user");
}