start adding semaphores

This commit is contained in:
Bryan Stitt 2022-09-27 02:01:45 +00:00
parent 664cf93668
commit 0c7a2b05ec
6 changed files with 111 additions and 36 deletions

View File

@ -182,7 +182,6 @@ These are roughly in order of completition
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down.
- [ ] option to rotate api key
- [ ] read the cookie key from a file. easy to re-use and no giant blob of hex in our app config
- [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized
- [ ] user create script should allow a description field
- [ ] user create script should allow multiple keys per user
@ -191,6 +190,7 @@ These are roughly in order of completition
These are not yet ordered.
- [ ] implement filters and other unimplemented rpc methods
- [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json
- [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge
- [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy)
@ -229,7 +229,7 @@ These are not yet ordered.
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number
- [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit
- [ ] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of semaphores. if the request timeout is exceeded, then we can return a rate limit exceeded error
- [ ] we need concurrent requests limits. these should NOT throw rate limit exceeded, instead they should wait on a dashmap of semaphores. or maybe an unbounded cache of Arc<tokio::sync::Semaphore>s. if the request timeout is exceeded, then we can return a rate limit exceeded error
new endpoints for users (not totally sure about the exact paths, but these features are all needed):

View File

@ -41,7 +41,7 @@ use std::str::FromStr;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
@ -106,7 +106,9 @@ pub struct Web3ProxyApp {
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub redis_pool: Option<RedisPool>,
pub user_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
}
/// flatten a JoinError into an anyhow error
@ -380,11 +382,19 @@ impl Web3ProxyApp {
// all the users are the same size, so no need for a weigher
// TODO: max_capacity from config
// TODO: ttl from config
let user_cache = Cache::builder()
let user_key_cache = Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(60))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// TODO: what should tti be for semaphores?
let user_key_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let ip_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let app = Self {
config: top_config.app,
balanced_rpcs,
@ -400,7 +410,9 @@ impl Web3ProxyApp {
redis_pool,
app_metrics,
open_request_handle_metrics,
user_cache,
user_key_cache,
user_key_semaphores,
ip_semaphores,
};
let app = Arc::new(app);

View File

@ -11,6 +11,7 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu
use serde::Serialize;
use std::fmt::Display;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing::{error, trace};
use ulid::Ulid;
@ -91,9 +92,9 @@ impl From<UserKey> for Uuid {
#[derive(Debug)]
pub enum RateLimitResult {
/// contains the IP of the anonymous user
AllowedIp(IpAddr),
AllowedIp(IpAddr, Arc<Semaphore>),
/// contains the user_key_id of an authenticated user
AllowedUser(UserKeyData),
AllowedUser(UserKeyData, Arc<Semaphore>),
/// contains the IP and retry_at of the anonymous user
RateLimitedIp(IpAddr, Option<Instant>),
/// contains the user_key_id and retry_at of an authenticated user key
@ -202,11 +203,11 @@ impl AuthorizedRequest {
pub async fn login_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<AuthorizedRequest, FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let ip = match app.rate_limit_login(ip).await? {
RateLimitResult::AllowedIp(x) => x,
let (ip, semaphore) = match app.rate_limit_login(ip).await? {
RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore),
RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
}
@ -214,7 +215,7 @@ pub async fn login_is_authorized(
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
};
Ok(AuthorizedRequest::Ip(ip))
Ok((AuthorizedRequest::Ip(ip), semaphore))
}
pub async fn bearer_is_authorized(
@ -224,7 +225,7 @@ pub async fn bearer_is_authorized(
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<AuthorizedRequest, FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
let mut redis_conn = app.redis_conn().await.context("Getting redis connection")?;
// TODO: verify that bearer.token() is a Ulid?
@ -259,11 +260,11 @@ pub async fn bearer_is_authorized(
pub async fn ip_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<AuthorizedRequest, FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let ip = match app.rate_limit_by_ip(ip).await? {
RateLimitResult::AllowedIp(x) => x,
let (ip, semaphore) = match app.rate_limit_by_ip(ip).await? {
RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore),
RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
}
@ -271,7 +272,7 @@ pub async fn ip_is_authorized(
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
};
Ok(AuthorizedRequest::Ip(ip))
Ok((AuthorizedRequest::Ip(ip), semaphore))
}
pub async fn key_is_authorized(
@ -281,10 +282,10 @@ pub async fn key_is_authorized(
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<AuthorizedRequest, FrontendErrorResponse> {
) -> Result<(AuthorizedRequest, Arc<Semaphore>), FrontendErrorResponse> {
// check the rate limits. error if over the limit
let user_data = match app.rate_limit_by_key(user_key).await? {
RateLimitResult::AllowedUser(x) => x,
let (user_data, semaphore) = match app.rate_limit_by_key(user_key).await? {
RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore),
RateLimitResult::RateLimitedUser(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedUser(x, retry_at));
}
@ -297,7 +298,7 @@ pub async fn key_is_authorized(
let db = app.db_conn.clone();
Ok(AuthorizedRequest::User(db, authorized_user))
Ok((AuthorizedRequest::User(db, authorized_user), semaphore))
}
impl Web3ProxyApp {
@ -307,7 +308,16 @@ impl Web3ProxyApp {
// TODO: query redis in the background so that users don't have to wait on this network request
if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip)),
Ok(RedisRateLimitResult::Allowed(_)) => {
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
@ -324,7 +334,15 @@ impl Web3ProxyApp {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "login rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedIp(ip))
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
}
} else {
@ -339,7 +357,16 @@ impl Web3ProxyApp {
// TODO: query redis in the background so that users don't have to wait on this network request
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
match rate_limiter.throttle(ip, None, 1).await {
Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)),
Ok(DeferredRateLimitResult::Allowed) => {
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
@ -356,7 +383,15 @@ impl Web3ProxyApp {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedIp(ip))
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
}
} else {
@ -368,7 +403,7 @@ impl Web3ProxyApp {
// check the local cache for user data, or query the database
pub(crate) async fn user_data(&self, user_key: UserKey) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self
.user_cache
.user_key_cache
.try_get_with(user_key.into(), async move {
trace!(?user_key, "user_cache miss");
@ -457,7 +492,16 @@ impl Web3ProxyApp {
}
let user_max_requests_per_period = match user_data.user_max_requests_per_period {
None => return Ok(RateLimitResult::AllowedUser(user_data)),
None => {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
todo!("write this");
})
.await;
return Ok(RateLimitResult::AllowedUser(user_data, semaphore));
}
Some(x) => x,
};
@ -467,7 +511,16 @@ impl Web3ProxyApp {
.throttle(user_key.into(), Some(user_max_requests_per_period), 1)
.await
{
Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data)),
Ok(DeferredRateLimitResult::Allowed) => {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
@ -485,7 +538,15 @@ impl Web3ProxyApp {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedUser(user_data))
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
todo!("write this (dry)");
})
.await;
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
}
}
} else {

View File

@ -21,7 +21,8 @@ pub async fn proxy_web3_rpc(
) -> FrontendResult {
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let (authorized_request, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer
{
let origin = origin.map(|x| x.0);
let referer = referer.map(|x| x.0);
let user_agent = user_agent.map(|x| x.0);
@ -64,7 +65,7 @@ pub async fn proxy_web3_rpc_with_key(
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
// TODO: this should probably return the user_key_id instead? or maybe both?
let authorized_request = key_is_authorized(
let (authorized_request, _semaphore) = key_is_authorized(
&app,
user_key,
ip,

View File

@ -40,7 +40,8 @@ pub async fn websocket_handler(
) -> FrontendResult {
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let (authorized_request, _semaphore) = if let Some(TypedHeader(Authorization(bearer))) = bearer
{
let origin = origin.map(|x| x.0);
let referer = referer.map(|x| x.0);
let user_agent = user_agent.map(|x| x.0);
@ -85,7 +86,7 @@ pub async fn websocket_handler_with_key(
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let authorized_request = key_is_authorized(
let (authorized_request, _semaphore) = key_is_authorized(
&app,
user_key,
ip,

View File

@ -24,14 +24,14 @@ pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl In
/// TODO: replace this with proper stats and monitoring
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.pending_transactions.sync();
app.user_cache.sync();
app.user_key_cache.sync();
// TODO: what else should we include? uptime, cache hit rates, cpu load
let body = json!({
"pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(),
"user_cache_count": app.user_cache.entry_count(),
"user_cache_size": app.user_cache.weighted_size(),
"user_cache_count": app.user_key_cache.entry_count(),
"user_cache_size": app.user_key_cache.weighted_size(),
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
});