create a struct for authenticated requests that we need for per-key stats
This commit is contained in:
parent
8481f6d44c
commit
6905e9fd46
22
Cargo.lock
generated
22
Cargo.lock
generated
@ -100,9 +100,9 @@ checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
|
||||
|
||||
[[package]]
|
||||
name = "argh"
|
||||
version = "0.1.8"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7e7e4aa7e40747e023c0761dafcb42333a9517575bbf1241747f68dd3177a62"
|
||||
checksum = "c375edecfd2074d5edcc31396860b6e54b6f928714d0e097b983053fac0cabe3"
|
||||
dependencies = [
|
||||
"argh_derive",
|
||||
"argh_shared",
|
||||
@ -110,12 +110,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "argh_derive"
|
||||
version = "0.1.8"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69f2bd7ff6ed6414f4e5521bd509bae46454bbd513801767ced3f21a751ab4bc"
|
||||
checksum = "aa013479b80109a1bf01a039412b0f0013d716f36921226d86c6709032fb7a03"
|
||||
dependencies = [
|
||||
"argh_shared",
|
||||
"heck 0.3.3",
|
||||
"heck 0.4.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
@ -123,9 +123,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "argh_shared"
|
||||
version = "0.1.8"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47253b98986dafc7a3e1cf3259194f1f47ac61abb57a57f46ec09e48d004ecda"
|
||||
checksum = "149f75bbec1827618262e0855a68f0f9a7f2edc13faebf33c4f16d6725edb6a9"
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
@ -4276,9 +4276,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.144"
|
||||
version = "1.0.145"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860"
|
||||
checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
@ -4295,9 +4295,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.144"
|
||||
version = "1.0.145"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00"
|
||||
checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
@ -3,6 +3,7 @@ chain_id = 1
|
||||
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
|
||||
# TODO: how do we find the optimal db_max_connections? too high actually ends up being slower
|
||||
db_max_connections = 99
|
||||
influxdb_url = "http://influxdb:8086"
|
||||
min_sum_soft_limit = 2000
|
||||
min_synced_rpcs = 2
|
||||
redis_url = "redis://dev-redis:6379/"
|
||||
|
@ -20,6 +20,22 @@ services:
|
||||
volumes:
|
||||
- ./data/dev_mysql:/var/lib/mysql
|
||||
|
||||
dev-influxdb:
|
||||
image: influxdb:latest
|
||||
ports:
|
||||
- '127.0.0.1:18086:8086'
|
||||
volumes:
|
||||
- ./data/dev_influxdb:/var/lib/influxdb
|
||||
environment:
|
||||
- INFLUXDB_DB=db0
|
||||
- INFLUXDB_ADMIN_USER=admin
|
||||
- INFLUXDB_ADMIN_PASSWORD=dev_web3_proxy
|
||||
|
||||
dev-otel-collector:
|
||||
image: otel/opentelemetry-collector-dev:latest
|
||||
expose:
|
||||
- 4317
|
||||
|
||||
dev-adminer:
|
||||
image: adminer
|
||||
ports:
|
||||
|
@ -11,5 +11,5 @@ path = "src/mod.rs"
|
||||
|
||||
[dependencies]
|
||||
sea-orm = "0.9.2"
|
||||
serde = "1.0.144"
|
||||
serde = "1.0.145"
|
||||
uuid = "1.1.2"
|
||||
|
@ -21,7 +21,7 @@ redis-rate-limiter = { path = "../redis-rate-limiter" }
|
||||
|
||||
anyhow = { version = "1.0.65", features = ["backtrace"] }
|
||||
arc-swap = "1.5.1"
|
||||
argh = "0.1.8"
|
||||
argh = "0.1.9"
|
||||
axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] }
|
||||
axum-auth = "0.3.0"
|
||||
axum-client-ip = "0.2.0"
|
||||
@ -51,7 +51,7 @@ handlebars = "4.3.4"
|
||||
rustc-hash = "1.1.0"
|
||||
siwe = "0.4.2"
|
||||
sea-orm = { version = "0.9.2", features = ["macros"] }
|
||||
serde = { version = "1.0.144", features = [] }
|
||||
serde = { version = "1.0.145", features = [] }
|
||||
serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] }
|
||||
serde_prometheus = "0.1.6"
|
||||
# TODO: make sure this time version matches siwe. PR to put this in their prelude
|
||||
@ -59,7 +59,7 @@ time = "0.3.14"
|
||||
tokio = { version = "1.21.1", features = ["full", "tracing"] }
|
||||
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
|
||||
tokio-stream = { version = "0.1.10", features = ["sync"] }
|
||||
tower-cookies = "0.7"
|
||||
tower-cookies = "0.7.0"
|
||||
toml = "0.5.9"
|
||||
tower = "0.4.13"
|
||||
tower-request-id = "0.2.0"
|
||||
|
@ -41,7 +41,7 @@ use tokio::sync::{broadcast, watch};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
|
||||
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
|
||||
use tracing::{error, info, trace, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
// TODO: make this customizable?
|
||||
@ -60,9 +60,10 @@ type ResponseCache =
|
||||
|
||||
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
|
||||
|
||||
#[derive(Clone, Copy, From)]
|
||||
pub struct UserCacheValue {
|
||||
pub user_id: u64,
|
||||
#[derive(Clone, Copy, Debug, From, Serialize)]
|
||||
/// TODO: rename this?
|
||||
pub struct UserData {
|
||||
pub user_key_id: u64,
|
||||
/// if None, allow unlimited queries
|
||||
pub user_count_per_period: Option<u64>,
|
||||
}
|
||||
@ -90,7 +91,7 @@ pub struct Web3ProxyApp {
|
||||
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
|
||||
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>,
|
||||
pub redis_pool: Option<RedisPool>,
|
||||
pub user_cache: Cache<Uuid, UserCacheValue, hashbrown::hash_map::DefaultHashBuilder>,
|
||||
pub user_cache: Cache<Uuid, UserData, hashbrown::hash_map::DefaultHashBuilder>,
|
||||
}
|
||||
|
||||
/// flatten a JoinError into an anyhow error
|
||||
|
@ -9,7 +9,6 @@ use hashbrown::HashMap;
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::instrument;
|
||||
|
||||
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Connection>);
|
||||
pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>);
|
||||
@ -53,6 +52,7 @@ pub struct AppConfig {
|
||||
/// minimum size of the connection pool for the database
|
||||
/// If none, the minimum * 2 is used
|
||||
pub db_max_connections: Option<u32>,
|
||||
pub influxdb_url: Option<String>,
|
||||
#[serde(default = "default_default_requests_per_minute")]
|
||||
pub default_requests_per_minute: u64,
|
||||
pub invite_code: Option<String>,
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::jsonrpc::JsonRpcForwardedResponse;
|
||||
use crate::{app::UserData, jsonrpc::JsonRpcForwardedResponse};
|
||||
use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
@ -9,7 +9,7 @@ use redis_rate_limiter::redis::RedisError;
|
||||
use sea_orm::DbErr;
|
||||
use std::{error::Error, net::IpAddr};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{instrument, warn};
|
||||
use tracing::warn;
|
||||
|
||||
// TODO: take "IntoResult" instead?
|
||||
pub type FrontendResult = Result<Response, FrontendErrorResponse>;
|
||||
@ -21,7 +21,7 @@ pub enum FrontendErrorResponse {
|
||||
Redis(RedisError),
|
||||
Response(Response),
|
||||
Database(DbErr),
|
||||
RateLimitedUser(u64, Option<Instant>),
|
||||
RateLimitedUser(UserData, Option<Instant>),
|
||||
RateLimitedIp(IpAddr, Option<Instant>),
|
||||
UnknownKey,
|
||||
NotFound,
|
||||
@ -95,13 +95,14 @@ impl IntoResponse for FrontendErrorResponse {
|
||||
)
|
||||
}
|
||||
// TODO: this should actually by the id of the key. multiple users might control one key
|
||||
Self::RateLimitedUser(user_id, retry_at) => {
|
||||
Self::RateLimitedUser(user_data, retry_at) => {
|
||||
// TODO: emit a stat
|
||||
// TODO: include retry_at in the error
|
||||
(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
JsonRpcForwardedResponse::from_string(
|
||||
format!("too many requests from user {}!", user_id),
|
||||
// TODO: better error
|
||||
format!("too many requests from {:?}!", user_data),
|
||||
Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
|
@ -3,7 +3,6 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
||||
use moka::future::ConcurrentCacheExt;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
/// Health check page for load balancers to use
|
||||
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
|
||||
|
@ -1,6 +1,6 @@
|
||||
mod authorization;
|
||||
mod errors;
|
||||
mod http;
|
||||
mod rate_limit;
|
||||
mod rpc_proxy_http;
|
||||
mod rpc_proxy_ws;
|
||||
mod users;
|
||||
|
@ -1,185 +0,0 @@
|
||||
use super::errors::FrontendErrorResponse;
|
||||
use crate::app::{UserCacheValue, Web3ProxyApp};
|
||||
use anyhow::Context;
|
||||
use deferred_rate_limiter::DeferredRateLimitResult;
|
||||
use entities::user_keys;
|
||||
use sea_orm::{
|
||||
ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect,
|
||||
};
|
||||
use std::{net::IpAddr, sync::Arc};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{error, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RateLimitResult {
|
||||
AllowedIp(IpAddr),
|
||||
AllowedUser(u64),
|
||||
RateLimitedIp(IpAddr, Option<Instant>),
|
||||
RateLimitedUser(u64, Option<Instant>),
|
||||
UnknownKey,
|
||||
}
|
||||
|
||||
pub async fn rate_limit_by_ip(
|
||||
app: &Web3ProxyApp,
|
||||
ip: IpAddr,
|
||||
) -> Result<IpAddr, FrontendErrorResponse> {
|
||||
match app.rate_limit_by_ip(ip).await? {
|
||||
RateLimitResult::AllowedIp(x) => Ok(x),
|
||||
RateLimitResult::RateLimitedIp(x, retry_at) => {
|
||||
Err(FrontendErrorResponse::RateLimitedIp(x, retry_at))
|
||||
}
|
||||
// TODO: don't panic. give the user an error
|
||||
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn rate_limit_by_key(
|
||||
app: &Web3ProxyApp,
|
||||
// TODO: change this to a Ulid
|
||||
user_key: Uuid,
|
||||
) -> Result<u64, FrontendErrorResponse> {
|
||||
match app.rate_limit_by_key(user_key).await? {
|
||||
RateLimitResult::AllowedUser(x) => Ok(x),
|
||||
RateLimitResult::RateLimitedUser(x, retry_at) => {
|
||||
Err(FrontendErrorResponse::RateLimitedUser(x, retry_at))
|
||||
}
|
||||
RateLimitResult::UnknownKey => Err(FrontendErrorResponse::UnknownKey),
|
||||
// TODO: don't panic. give the user an error
|
||||
x => unimplemented!("rate_limit_by_key shouldn't ever see these: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3ProxyApp {
|
||||
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
|
||||
// TODO: dry this up with rate_limit_by_key
|
||||
// TODO: have a local cache because if we hit redis too hard we get errors
|
||||
// TODO: query redis in the background so that users don't have to wait on this network request
|
||||
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
|
||||
match rate_limiter.throttle(ip, None, 1).await {
|
||||
Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)),
|
||||
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
|
||||
// TODO: set headers so they know when they can retry
|
||||
// TODO: debug or trace?
|
||||
// this is too verbose, but a stat might be good
|
||||
trace!(?ip, "rate limit exceeded until {:?}", retry_at);
|
||||
Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at)))
|
||||
}
|
||||
Ok(DeferredRateLimitResult::RetryNever) => {
|
||||
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
|
||||
trace!(?ip, "rate limit is 0");
|
||||
Ok(RateLimitResult::RateLimitedIp(ip, None))
|
||||
}
|
||||
Err(err) => {
|
||||
// internal error, not rate limit being hit
|
||||
// TODO: i really want axum to do this for us in a single place.
|
||||
error!(?err, "rate limiter is unhappy. allowing ip");
|
||||
Ok(RateLimitResult::AllowedIp(ip))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
|
||||
todo!("no rate limiter");
|
||||
}
|
||||
}
|
||||
|
||||
// check the local cache for user data, or query the database
|
||||
pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result<UserCacheValue> {
|
||||
let db = self.db_conn.as_ref().context("no database")?;
|
||||
|
||||
let user_data: Result<_, Arc<anyhow::Error>> = self
|
||||
.user_cache
|
||||
.try_get_with(user_key, async move {
|
||||
trace!(?user_key, "user_cache miss");
|
||||
|
||||
/// helper enum for querying just a few columns instead of the entire table
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
enum QueryAs {
|
||||
UserId,
|
||||
RequestsPerMinute,
|
||||
}
|
||||
|
||||
// TODO: join the user table to this to return the User? we don't always need it
|
||||
match user_keys::Entity::find()
|
||||
.select_only()
|
||||
.column_as(user_keys::Column::UserId, QueryAs::UserId)
|
||||
.column_as(
|
||||
user_keys::Column::RequestsPerMinute,
|
||||
QueryAs::RequestsPerMinute,
|
||||
)
|
||||
.filter(user_keys::Column::ApiKey.eq(user_key))
|
||||
.filter(user_keys::Column::Active.eq(true))
|
||||
.into_values::<_, QueryAs>()
|
||||
.one(db)
|
||||
.await?
|
||||
{
|
||||
Some((user_id, requests_per_minute)) => {
|
||||
// TODO: add a column here for max, or is u64::MAX fine?
|
||||
let user_count_per_period = if requests_per_minute == u64::MAX {
|
||||
None
|
||||
} else {
|
||||
Some(requests_per_minute)
|
||||
};
|
||||
|
||||
Ok(UserCacheValue::from((user_id, user_count_per_period)))
|
||||
}
|
||||
None => Ok(UserCacheValue::from((0, Some(0)))),
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
// TODO: i'm not actually sure about this expect
|
||||
user_data.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference"))
|
||||
}
|
||||
|
||||
pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result<RateLimitResult> {
|
||||
let user_data = self.user_data(user_key).await?;
|
||||
|
||||
if user_data.user_id == 0 {
|
||||
return Ok(RateLimitResult::UnknownKey);
|
||||
}
|
||||
|
||||
let user_count_per_period = match user_data.user_count_per_period {
|
||||
None => return Ok(RateLimitResult::AllowedUser(user_data.user_id)),
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
// user key is valid. now check rate limits
|
||||
if let Some(rate_limiter) = &self.frontend_key_rate_limiter {
|
||||
match rate_limiter
|
||||
.throttle(user_key, Some(user_count_per_period), 1)
|
||||
.await
|
||||
{
|
||||
Ok(DeferredRateLimitResult::Allowed) => {
|
||||
Ok(RateLimitResult::AllowedUser(user_data.user_id))
|
||||
}
|
||||
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
|
||||
// TODO: set headers so they know when they can retry
|
||||
// TODO: debug or trace?
|
||||
// this is too verbose, but a stat might be good
|
||||
// TODO: keys are secrets! use the id instead
|
||||
trace!(?user_key, "rate limit exceeded until {:?}", retry_at);
|
||||
Ok(RateLimitResult::RateLimitedUser(
|
||||
user_data.user_id,
|
||||
Some(retry_at),
|
||||
))
|
||||
}
|
||||
Ok(DeferredRateLimitResult::RetryNever) => {
|
||||
// TODO: keys are secret. don't log them!
|
||||
trace!(?user_key, "rate limit is 0");
|
||||
Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None))
|
||||
}
|
||||
Err(err) => {
|
||||
// internal error, not rate limit being hit
|
||||
// TODO: i really want axum to do this for us in a single place.
|
||||
error!(?err, "rate limiter is unhappy. allowing ip");
|
||||
Ok(RateLimitResult::AllowedUser(user_data.user_id))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: if no redis, rate limit with just a local cache?
|
||||
// if we don't have redis, we probably don't have a db, so this probably will never happen
|
||||
Err(anyhow::anyhow!("no redis. cannot rate limit"))
|
||||
}
|
||||
}
|
||||
}
|
@ -1,13 +1,13 @@
|
||||
use super::authorization::{ip_is_authorized, key_is_authorized};
|
||||
use super::errors::FrontendResult;
|
||||
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key};
|
||||
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
|
||||
use axum::extract::{Host, Path};
|
||||
use axum::extract::Path;
|
||||
use axum::headers::{Referer, UserAgent};
|
||||
use axum::TypedHeader;
|
||||
use axum::{response::IntoResponse, Extension, Json};
|
||||
use axum_client_ip::ClientIp;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug_span, error_span, Instrument};
|
||||
use tracing::{error_span, Instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn public_proxy_web3_rpc(
|
||||
@ -19,7 +19,7 @@ pub async fn public_proxy_web3_rpc(
|
||||
) -> FrontendResult {
|
||||
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
|
||||
|
||||
let ip = rate_limit_by_ip(&app, ip)
|
||||
let ip = ip_is_authorized(&app, ip)
|
||||
.instrument(request_span.clone())
|
||||
.await?;
|
||||
|
||||
@ -38,15 +38,20 @@ pub async fn user_proxy_web3_rpc(
|
||||
user_agent: Option<TypedHeader<UserAgent>>,
|
||||
Path(user_key): Path<Uuid>,
|
||||
) -> FrontendResult {
|
||||
let request_span =
|
||||
error_span!("request", %ip, ?referer, ?user_agent, user_id = tracing::field::Empty);
|
||||
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
|
||||
|
||||
// TODO: this should probably return the user_key_id instead? or maybe both?
|
||||
let user_id = rate_limit_by_key(&app, user_key)
|
||||
.instrument(request_span.clone())
|
||||
.await?;
|
||||
let authorized_request = key_is_authorized(
|
||||
&app,
|
||||
user_key,
|
||||
ip,
|
||||
referer.map(|x| x.0),
|
||||
user_agent.map(|x| x.0),
|
||||
)
|
||||
.instrument(request_span.clone())
|
||||
.await?;
|
||||
|
||||
request_span.record("user_id", user_id);
|
||||
let request_span = error_span!("request", ?authorized_request);
|
||||
|
||||
let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await });
|
||||
|
||||
|
@ -1,10 +1,11 @@
|
||||
use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest};
|
||||
use super::errors::FrontendResult;
|
||||
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key};
|
||||
use axum::headers::{Referer, UserAgent};
|
||||
use axum::{
|
||||
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
||||
extract::Path,
|
||||
response::{IntoResponse, Redirect},
|
||||
Extension,
|
||||
Extension, TypedHeader,
|
||||
};
|
||||
use axum_client_ip::ClientIp;
|
||||
use axum_macros::debug_handler;
|
||||
@ -32,15 +33,15 @@ pub async fn public_websocket_handler(
|
||||
ClientIp(ip): ClientIp,
|
||||
ws_upgrade: Option<WebSocketUpgrade>,
|
||||
) -> FrontendResult {
|
||||
let _ip = rate_limit_by_ip(&app, ip).await?;
|
||||
let authorized_request = ip_is_authorized(&app, ip).await?;
|
||||
|
||||
let user_id = 0;
|
||||
|
||||
let user_span = error_span!("user", user_id);
|
||||
let request_span = error_span!("request", ?authorized_request);
|
||||
|
||||
match ws_upgrade {
|
||||
Some(ws) => Ok(ws
|
||||
.on_upgrade(|socket| proxy_web3_socket(app, socket).instrument(user_span))
|
||||
.on_upgrade(|socket| {
|
||||
proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
|
||||
})
|
||||
.into_response()),
|
||||
None => {
|
||||
// this is not a websocket. redirect to a friendly page
|
||||
@ -52,18 +53,29 @@ pub async fn public_websocket_handler(
|
||||
#[debug_handler]
|
||||
pub async fn user_websocket_handler(
|
||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||
ClientIp(ip): ClientIp,
|
||||
Path(user_key): Path<Uuid>,
|
||||
referer: Option<TypedHeader<Referer>>,
|
||||
user_agent: Option<TypedHeader<UserAgent>>,
|
||||
ws_upgrade: Option<WebSocketUpgrade>,
|
||||
) -> FrontendResult {
|
||||
let user_id: u64 = rate_limit_by_key(&app, user_key).await?;
|
||||
let authorized_request = key_is_authorized(
|
||||
&app,
|
||||
user_key,
|
||||
ip,
|
||||
referer.map(|x| x.0),
|
||||
user_agent.map(|x| x.0),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// log the id, not the address. we don't want to expose the user's address
|
||||
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses
|
||||
let user_span = error_span!("user", user_id);
|
||||
let request_span = error_span!("request", ?authorized_request);
|
||||
|
||||
match ws_upgrade {
|
||||
Some(ws_upgrade) => Ok(ws_upgrade
|
||||
.on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span))),
|
||||
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
|
||||
proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
|
||||
})),
|
||||
None => {
|
||||
// TODO: store this on the app and use register_template?
|
||||
let reg = Handlebars::new();
|
||||
@ -73,7 +85,7 @@ pub async fn user_websocket_handler(
|
||||
let user_url = reg
|
||||
.render_template(
|
||||
&app.config.redirect_user_url,
|
||||
&json!({ "user_id": user_id }),
|
||||
&json!({ "authorized_request": authorized_request }),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -83,7 +95,11 @@ pub async fn user_websocket_handler(
|
||||
}
|
||||
}
|
||||
|
||||
async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket) {
|
||||
async fn proxy_web3_socket(
|
||||
app: Arc<Web3ProxyApp>,
|
||||
authorized_request: AuthorizedRequest,
|
||||
socket: WebSocket,
|
||||
) {
|
||||
// split the websocket so we can read and write concurrently
|
||||
let (ws_tx, ws_rx) = socket.split();
|
||||
|
||||
@ -91,7 +107,12 @@ async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket) {
|
||||
let (response_sender, response_receiver) = flume::unbounded::<Message>();
|
||||
|
||||
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
|
||||
tokio::spawn(read_web3_socket(app, ws_rx, response_sender));
|
||||
tokio::spawn(read_web3_socket(
|
||||
app,
|
||||
authorized_request,
|
||||
ws_rx,
|
||||
response_sender,
|
||||
));
|
||||
}
|
||||
|
||||
/// websockets support a few more methods than http clients
|
||||
@ -173,6 +194,7 @@ async fn handle_socket_payload(
|
||||
|
||||
async fn read_web3_socket(
|
||||
app: Arc<Web3ProxyApp>,
|
||||
authorized_request: AuthorizedRequest,
|
||||
mut ws_rx: SplitStream<WebSocket>,
|
||||
response_sender: flume::Sender<Message>,
|
||||
) {
|
||||
|
@ -7,8 +7,8 @@
|
||||
// I wonder how we handle payment
|
||||
// probably have to do manual withdrawals
|
||||
|
||||
use super::authorization::ip_is_authorized;
|
||||
use super::errors::FrontendResult;
|
||||
use super::rate_limit::rate_limit_by_ip;
|
||||
use crate::{app::Web3ProxyApp, users::new_api_key};
|
||||
use anyhow::Context;
|
||||
use axum::{
|
||||
@ -42,7 +42,7 @@ pub async fn get_login(
|
||||
// TODO: allow ENS names here?
|
||||
Path(mut params): Path<HashMap<String, String>>,
|
||||
) -> FrontendResult {
|
||||
let _ip = rate_limit_by_ip(&app, ip).await?;
|
||||
let _ip = ip_is_authorized(&app, ip).await?;
|
||||
|
||||
// at first i thought about checking that user_address is in our db
|
||||
// but theres no need to separate the registration and login flows
|
||||
@ -144,7 +144,7 @@ pub async fn post_login(
|
||||
Json(payload): Json<PostLogin>,
|
||||
Query(query): Query<PostLoginQuery>,
|
||||
) -> FrontendResult {
|
||||
let _ip = rate_limit_by_ip(&app, ip).await?;
|
||||
let _ip = ip_is_authorized(&app, ip).await?;
|
||||
|
||||
if let Some(invite_code) = &app.config.invite_code {
|
||||
// we don't do per-user referral codes because we shouldn't collect what we don't need.
|
||||
@ -273,7 +273,7 @@ pub async fn post_user(
|
||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||
Json(payload): Json<PostUser>,
|
||||
) -> FrontendResult {
|
||||
let _ip = rate_limit_by_ip(&app, ip).await?;
|
||||
let _ip = ip_is_authorized(&app, ip).await?;
|
||||
|
||||
let user = ProtectedAction::PostUser
|
||||
.verify(app.as_ref(), bearer_token, &payload.primary_address)
|
||||
|
@ -4,7 +4,7 @@ use axum::response::{IntoResponse, Response};
|
||||
use axum::{routing::get, Extension, Router};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tracing::{info, instrument};
|
||||
use tracing::info;
|
||||
|
||||
use crate::app::Web3ProxyApp;
|
||||
|
||||
|
@ -12,7 +12,7 @@ use std::sync::atomic;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{sleep, Duration, Instant};
|
||||
use tracing::Level;
|
||||
use tracing::{debug, error, trace, warn, Event};
|
||||
use tracing::{debug, error, trace, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OpenRequestResult {
|
||||
@ -142,28 +142,6 @@ impl OpenRequestHandle {
|
||||
}
|
||||
RequestErrorHandler::SaveReverts(chance) => {
|
||||
// TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? we'll need eth_sendRawTransaction somewhere else
|
||||
|
||||
if let Some(metadata) = tracing::Span::current().metadata() {
|
||||
let fields = metadata.fields();
|
||||
|
||||
if let Some(user_id) = fields.field("user_id") {
|
||||
let values = [(&user_id, None)];
|
||||
|
||||
let valueset = fields.value_set(&values);
|
||||
|
||||
let visitor = todo!();
|
||||
|
||||
valueset.record(visitor);
|
||||
|
||||
// TODO: now how we do we get the current value out of it? we might need this index
|
||||
} else {
|
||||
warn!("no user id");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: check the span for user_key_id
|
||||
|
||||
// TODO: only set SaveReverts for
|
||||
// TODO: logging every one is going to flood the database
|
||||
// TODO: have a percent chance to do this. or maybe a "logged reverts per second"
|
||||
if let ProviderError::JsonRpcClientError(err) = err {
|
||||
|
Loading…
Reference in New Issue
Block a user