From 6905e9fd46939c8cdbc1c05a721ed9f8f9ef9f78 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 22 Sep 2022 19:57:21 +0000 Subject: [PATCH] create a struct for authenticated requests that we need for per-key stats --- Cargo.lock | 22 +-- config/example.toml | 1 + docker-compose.yml | 16 ++ entities/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 6 +- web3_proxy/src/app.rs | 11 +- web3_proxy/src/config.rs | 2 +- web3_proxy/src/frontend/errors.rs | 11 +- web3_proxy/src/frontend/http.rs | 1 - web3_proxy/src/frontend/mod.rs | 2 +- web3_proxy/src/frontend/rate_limit.rs | 185 ---------------------- web3_proxy/src/frontend/rpc_proxy_http.rs | 25 +-- web3_proxy/src/frontend/rpc_proxy_ws.rs | 50 ++++-- web3_proxy/src/frontend/users.rs | 8 +- web3_proxy/src/metrics_frontend.rs | 2 +- web3_proxy/src/rpcs/request.rs | 24 +-- 16 files changed, 103 insertions(+), 265 deletions(-) delete mode 100644 web3_proxy/src/frontend/rate_limit.rs diff --git a/Cargo.lock b/Cargo.lock index 3c840fe5..6d338b94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,9 +100,9 @@ checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" [[package]] name = "argh" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7e7e4aa7e40747e023c0761dafcb42333a9517575bbf1241747f68dd3177a62" +checksum = "c375edecfd2074d5edcc31396860b6e54b6f928714d0e097b983053fac0cabe3" dependencies = [ "argh_derive", "argh_shared", @@ -110,12 +110,12 @@ dependencies = [ [[package]] name = "argh_derive" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f2bd7ff6ed6414f4e5521bd509bae46454bbd513801767ced3f21a751ab4bc" +checksum = "aa013479b80109a1bf01a039412b0f0013d716f36921226d86c6709032fb7a03" dependencies = [ "argh_shared", - "heck 0.3.3", + "heck 0.4.0", "proc-macro2", "quote", "syn", @@ -123,9 +123,9 @@ dependencies = [ [[package]] name = "argh_shared" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47253b98986dafc7a3e1cf3259194f1f47ac61abb57a57f46ec09e48d004ecda" +checksum = "149f75bbec1827618262e0855a68f0f9a7f2edc13faebf33c4f16d6725edb6a9" [[package]] name = "arrayvec" @@ -4276,9 +4276,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] @@ -4295,9 +4295,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", diff --git a/config/example.toml b/config/example.toml index 0d915dc7..f1626e14 100644 --- a/config/example.toml +++ b/config/example.toml @@ -3,6 +3,7 @@ chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" # TODO: how do we find the optimal db_max_connections? too high actually ends up being slower db_max_connections = 99 +influxdb_url = "http://influxdb:8086" min_sum_soft_limit = 2000 min_synced_rpcs = 2 redis_url = "redis://dev-redis:6379/" diff --git a/docker-compose.yml b/docker-compose.yml index 2395e628..36703fcb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,22 @@ services: volumes: - ./data/dev_mysql:/var/lib/mysql + dev-influxdb: + image: influxdb:latest + ports: + - '127.0.0.1:18086:8086' + volumes: + - ./data/dev_influxdb:/var/lib/influxdb + environment: + - INFLUXDB_DB=db0 + - INFLUXDB_ADMIN_USER=admin + - INFLUXDB_ADMIN_PASSWORD=dev_web3_proxy + + dev-otel-collector: + image: otel/opentelemetry-collector-dev:latest + expose: + - 4317 + dev-adminer: image: adminer ports: diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 0ef913f8..1ab870cf 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -11,5 +11,5 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.9.2" -serde = "1.0.144" +serde = "1.0.145" uuid = "1.1.2" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index d246b498..b17a5f88 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -21,7 +21,7 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = { version = "1.0.65", features = ["backtrace"] } arc-swap = "1.5.1" -argh = "0.1.8" +argh = "0.1.9" axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } axum-auth = "0.3.0" axum-client-ip = "0.2.0" @@ -51,7 +51,7 @@ handlebars = "4.3.4" rustc-hash = "1.1.0" siwe = "0.4.2" sea-orm = { version = "0.9.2", features = ["macros"] } -serde = { version = "1.0.144", features = [] } +serde = { version = "1.0.145", features = [] } serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude @@ -59,7 +59,7 @@ time = "0.3.14" tokio = { version = "1.21.1", features = ["full", "tracing"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude tokio-stream = { version = "0.1.10", features = ["sync"] } -tower-cookies = "0.7" +tower-cookies = "0.7.0" toml = "0.5.9" tower = "0.4.13" tower-request-id = "0.2.0" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index a2200bdf..671cda31 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -41,7 +41,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{error, info, trace, warn}; use uuid::Uuid; // TODO: make this customizable? @@ -60,9 +60,10 @@ type ResponseCache = pub type AnyhowJoinHandle = JoinHandle>; -#[derive(Clone, Copy, From)] -pub struct UserCacheValue { - pub user_id: u64, +#[derive(Clone, Copy, Debug, From, Serialize)] +/// TODO: rename this? +pub struct UserData { + pub user_key_id: u64, /// if None, allow unlimited queries pub user_count_per_period: Option, } @@ -90,7 +91,7 @@ pub struct Web3ProxyApp { pub frontend_ip_rate_limiter: Option>, pub frontend_key_rate_limiter: Option>, pub redis_pool: Option, - pub user_cache: Cache, + pub user_cache: Cache, } /// flatten a JoinError into an anyhow error diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 3faa3e41..46f00077 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -9,7 +9,6 @@ use hashbrown::HashMap; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::instrument; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -53,6 +52,7 @@ pub struct AppConfig { /// minimum size of the connection pool for the database /// If none, the minimum * 2 is used pub db_max_connections: Option, + pub influxdb_url: Option, #[serde(default = "default_default_requests_per_minute")] pub default_requests_per_minute: u64, pub invite_code: Option, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index fb59767b..5dbf2aaf 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -1,4 +1,4 @@ -use crate::jsonrpc::JsonRpcForwardedResponse; +use crate::{app::UserData, jsonrpc::JsonRpcForwardedResponse}; use axum::{ http::StatusCode, response::{IntoResponse, Response}, @@ -9,7 +9,7 @@ use redis_rate_limiter::redis::RedisError; use sea_orm::DbErr; use std::{error::Error, net::IpAddr}; use tokio::time::Instant; -use tracing::{instrument, warn}; +use tracing::warn; // TODO: take "IntoResult" instead? pub type FrontendResult = Result; @@ -21,7 +21,7 @@ pub enum FrontendErrorResponse { Redis(RedisError), Response(Response), Database(DbErr), - RateLimitedUser(u64, Option), + RateLimitedUser(UserData, Option), RateLimitedIp(IpAddr, Option), UnknownKey, NotFound, @@ -95,13 +95,14 @@ impl IntoResponse for FrontendErrorResponse { ) } // TODO: this should actually by the id of the key. multiple users might control one key - Self::RateLimitedUser(user_id, retry_at) => { + Self::RateLimitedUser(user_data, retry_at) => { // TODO: emit a stat // TODO: include retry_at in the error ( StatusCode::TOO_MANY_REQUESTS, JsonRpcForwardedResponse::from_string( - format!("too many requests from user {}!", user_id), + // TODO: better error + format!("too many requests from {:?}!", user_data), Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()), None, ), diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 98b7fc21..36322ab2 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -3,7 +3,6 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use moka::future::ConcurrentCacheExt; use serde_json::json; use std::sync::Arc; -use tracing::instrument; /// Health check page for load balancers to use pub async fn health(Extension(app): Extension>) -> impl IntoResponse { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index f37a5eb3..48aea6df 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -1,6 +1,6 @@ +mod authorization; mod errors; mod http; -mod rate_limit; mod rpc_proxy_http; mod rpc_proxy_ws; mod users; diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs deleted file mode 100644 index 6c4622cf..00000000 --- a/web3_proxy/src/frontend/rate_limit.rs +++ /dev/null @@ -1,185 +0,0 @@ -use super::errors::FrontendErrorResponse; -use crate::app::{UserCacheValue, Web3ProxyApp}; -use anyhow::Context; -use deferred_rate_limiter::DeferredRateLimitResult; -use entities::user_keys; -use sea_orm::{ - ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, -}; -use std::{net::IpAddr, sync::Arc}; -use tokio::time::Instant; -use tracing::{error, trace}; -use uuid::Uuid; - -#[derive(Debug)] -pub enum RateLimitResult { - AllowedIp(IpAddr), - AllowedUser(u64), - RateLimitedIp(IpAddr, Option), - RateLimitedUser(u64, Option), - UnknownKey, -} - -pub async fn rate_limit_by_ip( - app: &Web3ProxyApp, - ip: IpAddr, -) -> Result { - match app.rate_limit_by_ip(ip).await? { - RateLimitResult::AllowedIp(x) => Ok(x), - RateLimitResult::RateLimitedIp(x, retry_at) => { - Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)) - } - // TODO: don't panic. give the user an error - x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), - } -} - -pub async fn rate_limit_by_key( - app: &Web3ProxyApp, - // TODO: change this to a Ulid - user_key: Uuid, -) -> Result { - match app.rate_limit_by_key(user_key).await? { - RateLimitResult::AllowedUser(x) => Ok(x), - RateLimitResult::RateLimitedUser(x, retry_at) => { - Err(FrontendErrorResponse::RateLimitedUser(x, retry_at)) - } - RateLimitResult::UnknownKey => Err(FrontendErrorResponse::UnknownKey), - // TODO: don't panic. give the user an error - x => unimplemented!("rate_limit_by_key shouldn't ever see these: {:?}", x), - } -} - -impl Web3ProxyApp { - pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { - // TODO: dry this up with rate_limit_by_key - // TODO: have a local cache because if we hit redis too hard we get errors - // TODO: query redis in the background so that users don't have to wait on this network request - if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { - match rate_limiter.throttle(ip, None, 1).await { - Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)), - Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good - trace!(?ip, "rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at))) - } - Ok(DeferredRateLimitResult::RetryNever) => { - // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - trace!(?ip, "rate limit is 0"); - Ok(RateLimitResult::RateLimitedIp(ip, None)) - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedIp(ip)) - } - } - } else { - // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right - todo!("no rate limiter"); - } - } - - // check the local cache for user data, or query the database - pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result { - let db = self.db_conn.as_ref().context("no database")?; - - let user_data: Result<_, Arc> = self - .user_cache - .try_get_with(user_key, async move { - trace!(?user_key, "user_cache miss"); - - /// helper enum for querying just a few columns instead of the entire table - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - UserId, - RequestsPerMinute, - } - - // TODO: join the user table to this to return the User? we don't always need it - match user_keys::Entity::find() - .select_only() - .column_as(user_keys::Column::UserId, QueryAs::UserId) - .column_as( - user_keys::Column::RequestsPerMinute, - QueryAs::RequestsPerMinute, - ) - .filter(user_keys::Column::ApiKey.eq(user_key)) - .filter(user_keys::Column::Active.eq(true)) - .into_values::<_, QueryAs>() - .one(db) - .await? - { - Some((user_id, requests_per_minute)) => { - // TODO: add a column here for max, or is u64::MAX fine? - let user_count_per_period = if requests_per_minute == u64::MAX { - None - } else { - Some(requests_per_minute) - }; - - Ok(UserCacheValue::from((user_id, user_count_per_period))) - } - None => Ok(UserCacheValue::from((0, Some(0)))), - } - }) - .await; - - // TODO: i'm not actually sure about this expect - user_data.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference")) - } - - pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result { - let user_data = self.user_data(user_key).await?; - - if user_data.user_id == 0 { - return Ok(RateLimitResult::UnknownKey); - } - - let user_count_per_period = match user_data.user_count_per_period { - None => return Ok(RateLimitResult::AllowedUser(user_data.user_id)), - Some(x) => x, - }; - - // user key is valid. now check rate limits - if let Some(rate_limiter) = &self.frontend_key_rate_limiter { - match rate_limiter - .throttle(user_key, Some(user_count_per_period), 1) - .await - { - Ok(DeferredRateLimitResult::Allowed) => { - Ok(RateLimitResult::AllowedUser(user_data.user_id)) - } - Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good - // TODO: keys are secrets! use the id instead - trace!(?user_key, "rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimitedUser( - user_data.user_id, - Some(retry_at), - )) - } - Ok(DeferredRateLimitResult::RetryNever) => { - // TODO: keys are secret. don't log them! - trace!(?user_key, "rate limit is 0"); - Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedUser(user_data.user_id)) - } - } - } else { - // TODO: if no redis, rate limit with just a local cache? - // if we don't have redis, we probably don't have a db, so this probably will never happen - Err(anyhow::anyhow!("no redis. cannot rate limit")) - } - } -} diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index e1a064b3..e3980264 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,13 +1,13 @@ +use super::authorization::{ip_is_authorized, key_is_authorized}; use super::errors::FrontendResult; -use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; -use axum::extract::{Host, Path}; +use axum::extract::Path; use axum::headers::{Referer, UserAgent}; use axum::TypedHeader; use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use std::sync::Arc; -use tracing::{debug_span, error_span, Instrument}; +use tracing::{error_span, Instrument}; use uuid::Uuid; pub async fn public_proxy_web3_rpc( @@ -19,7 +19,7 @@ pub async fn public_proxy_web3_rpc( ) -> FrontendResult { let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let ip = rate_limit_by_ip(&app, ip) + let ip = ip_is_authorized(&app, ip) .instrument(request_span.clone()) .await?; @@ -38,15 +38,20 @@ pub async fn user_proxy_web3_rpc( user_agent: Option>, Path(user_key): Path, ) -> FrontendResult { - let request_span = - error_span!("request", %ip, ?referer, ?user_agent, user_id = tracing::field::Empty); + let request_span = error_span!("request", %ip, ?referer, ?user_agent); // TODO: this should probably return the user_key_id instead? or maybe both? - let user_id = rate_limit_by_key(&app, user_key) - .instrument(request_span.clone()) - .await?; + let authorized_request = key_is_authorized( + &app, + user_key, + ip, + referer.map(|x| x.0), + user_agent.map(|x| x.0), + ) + .instrument(request_span.clone()) + .await?; - request_span.record("user_id", user_id); + let request_span = error_span!("request", ?authorized_request); let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index a4c99566..7d869936 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -1,10 +1,11 @@ +use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest}; use super::errors::FrontendResult; -use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key}; +use axum::headers::{Referer, UserAgent}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::Path, response::{IntoResponse, Redirect}, - Extension, + Extension, TypedHeader, }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; @@ -32,15 +33,15 @@ pub async fn public_websocket_handler( ClientIp(ip): ClientIp, ws_upgrade: Option, ) -> FrontendResult { - let _ip = rate_limit_by_ip(&app, ip).await?; + let authorized_request = ip_is_authorized(&app, ip).await?; - let user_id = 0; - - let user_span = error_span!("user", user_id); + let request_span = error_span!("request", ?authorized_request); match ws_upgrade { Some(ws) => Ok(ws - .on_upgrade(|socket| proxy_web3_socket(app, socket).instrument(user_span)) + .on_upgrade(|socket| { + proxy_web3_socket(app, authorized_request, socket).instrument(request_span) + }) .into_response()), None => { // this is not a websocket. redirect to a friendly page @@ -52,18 +53,29 @@ pub async fn public_websocket_handler( #[debug_handler] pub async fn user_websocket_handler( Extension(app): Extension>, + ClientIp(ip): ClientIp, Path(user_key): Path, + referer: Option>, + user_agent: Option>, ws_upgrade: Option, ) -> FrontendResult { - let user_id: u64 = rate_limit_by_key(&app, user_key).await?; + let authorized_request = key_is_authorized( + &app, + user_key, + ip, + referer.map(|x| x.0), + user_agent.map(|x| x.0), + ) + .await?; // log the id, not the address. we don't want to expose the user's address // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses - let user_span = error_span!("user", user_id); + let request_span = error_span!("request", ?authorized_request); match ws_upgrade { - Some(ws_upgrade) => Ok(ws_upgrade - .on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span))), + Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { + proxy_web3_socket(app, authorized_request, socket).instrument(request_span) + })), None => { // TODO: store this on the app and use register_template? let reg = Handlebars::new(); @@ -73,7 +85,7 @@ pub async fn user_websocket_handler( let user_url = reg .render_template( &app.config.redirect_user_url, - &json!({ "user_id": user_id }), + &json!({ "authorized_request": authorized_request }), ) .unwrap(); @@ -83,7 +95,11 @@ pub async fn user_websocket_handler( } } -async fn proxy_web3_socket(app: Arc, socket: WebSocket) { +async fn proxy_web3_socket( + app: Arc, + authorized_request: AuthorizedRequest, + socket: WebSocket, +) { // split the websocket so we can read and write concurrently let (ws_tx, ws_rx) = socket.split(); @@ -91,7 +107,12 @@ async fn proxy_web3_socket(app: Arc, socket: WebSocket) { let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); - tokio::spawn(read_web3_socket(app, ws_rx, response_sender)); + tokio::spawn(read_web3_socket( + app, + authorized_request, + ws_rx, + response_sender, + )); } /// websockets support a few more methods than http clients @@ -173,6 +194,7 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, + authorized_request: AuthorizedRequest, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index fbbcfe5c..7e17a011 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -7,8 +7,8 @@ // I wonder how we handle payment // probably have to do manual withdrawals +use super::authorization::ip_is_authorized; use super::errors::FrontendResult; -use super::rate_limit::rate_limit_by_ip; use crate::{app::Web3ProxyApp, users::new_api_key}; use anyhow::Context; use axum::{ @@ -42,7 +42,7 @@ pub async fn get_login( // TODO: allow ENS names here? Path(mut params): Path>, ) -> FrontendResult { - let _ip = rate_limit_by_ip(&app, ip).await?; + let _ip = ip_is_authorized(&app, ip).await?; // at first i thought about checking that user_address is in our db // but theres no need to separate the registration and login flows @@ -144,7 +144,7 @@ pub async fn post_login( Json(payload): Json, Query(query): Query, ) -> FrontendResult { - let _ip = rate_limit_by_ip(&app, ip).await?; + let _ip = ip_is_authorized(&app, ip).await?; if let Some(invite_code) = &app.config.invite_code { // we don't do per-user referral codes because we shouldn't collect what we don't need. @@ -273,7 +273,7 @@ pub async fn post_user( Extension(app): Extension>, Json(payload): Json, ) -> FrontendResult { - let _ip = rate_limit_by_ip(&app, ip).await?; + let _ip = ip_is_authorized(&app, ip).await?; let user = ProtectedAction::PostUser .verify(app.as_ref(), bearer_token, &payload.primary_address) diff --git a/web3_proxy/src/metrics_frontend.rs b/web3_proxy/src/metrics_frontend.rs index 9056c651..2284017d 100644 --- a/web3_proxy/src/metrics_frontend.rs +++ b/web3_proxy/src/metrics_frontend.rs @@ -4,7 +4,7 @@ use axum::response::{IntoResponse, Response}; use axum::{routing::get, Extension, Router}; use std::net::SocketAddr; use std::sync::Arc; -use tracing::{info, instrument}; +use tracing::info; use crate::app::Web3ProxyApp; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index c85b5754..6437bf2b 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -12,7 +12,7 @@ use std::sync::atomic; use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; use tracing::Level; -use tracing::{debug, error, trace, warn, Event}; +use tracing::{debug, error, trace, warn}; #[derive(Debug)] pub enum OpenRequestResult { @@ -142,28 +142,6 @@ impl OpenRequestHandle { } RequestErrorHandler::SaveReverts(chance) => { // TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? we'll need eth_sendRawTransaction somewhere else - - if let Some(metadata) = tracing::Span::current().metadata() { - let fields = metadata.fields(); - - if let Some(user_id) = fields.field("user_id") { - let values = [(&user_id, None)]; - - let valueset = fields.value_set(&values); - - let visitor = todo!(); - - valueset.record(visitor); - - // TODO: now how we do we get the current value out of it? we might need this index - } else { - warn!("no user id"); - } - } - - // TODO: check the span for user_key_id - - // TODO: only set SaveReverts for // TODO: logging every one is going to flood the database // TODO: have a percent chance to do this. or maybe a "logged reverts per second" if let ProviderError::JsonRpcClientError(err) = err {