From 5df2469d53c583170ea841db68d8a0d93f91a525 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 24 Sep 2022 05:53:45 +0000 Subject: [PATCH] ULID or UUID. Prefer ULID --- TODO.md | 21 ++-- web3_proxy/src/app.rs | 26 ++--- .../src/bin/web3_proxy_cli/create_user.rs | 25 +++-- web3_proxy/src/frontend/authorization.rs | 100 ++++++++++++++++-- web3_proxy/src/frontend/mod.rs | 7 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 21 ++-- web3_proxy/src/frontend/rpc_proxy_ws.rs | 78 ++++++++++---- web3_proxy/src/frontend/users.rs | 15 +-- web3_proxy/src/lib.rs | 1 - web3_proxy/src/rpcs/blockchain.rs | 8 +- web3_proxy/src/rpcs/connection.rs | 8 +- web3_proxy/src/rpcs/connections.rs | 19 ++-- web3_proxy/src/rpcs/request.rs | 17 +-- web3_proxy/src/users.rs | 11 -- 14 files changed, 239 insertions(+), 118 deletions(-) delete mode 100644 web3_proxy/src/users.rs diff --git a/TODO.md b/TODO.md index 9f8a47ea..52dc6167 100644 --- a/TODO.md +++ b/TODO.md @@ -166,6 +166,11 @@ These are roughly in order of completition - for security, we want these limits low. - [x] user login should return the bearer token and the user keys - [x] use siwe messages and signatures for sign up and login +- [x] check for bearer token on /rpc +- [x] ip blocking logs a warn. we don't need that +- [x] Ulid instead of Uuid for user keys + - + - since users are actively using our service, we will need to support both - [ ] active requests per second per api key - [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.) - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly @@ -174,13 +179,9 @@ These are roughly in order of completition - [-] add configurable size limits to all the Caches - [ ] endpoint for creating/modifying api keys and their advanced security features - [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down. -- [ ] Ulid instead of Uuid for user keys - - - - since users are actively using our service, we will need to support both -- [ ] Ulid instead of Uuid for database ids - - might have to use Uuid in sea-orm and then convert to Ulid on display - [ ] option to rotate api key - [ ] read the cookie key from a file. easy to re-use and no giant blob of hex in our app config +- [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized ## V1 @@ -275,7 +276,7 @@ in another repo: event subscriber ## "Maybe some day" and other Miscellaneous Things -- [ ] tool to revoke bearer tokens that also clears redis +- [ ] tool to revoke bearer tokens that clears redis - [ ] eth_getBlockByNumber and similar calls served from the block map - will need all Block **and** Block in caches or fetched efficiently - so maybe we don't want this. we can just use the general request cache for these. they will only require 1 request and it means requests won't get in the way as much on writes as new blocks arrive. @@ -371,7 +372,6 @@ in another repo: event subscriber - [ ] https://gitlab.com/moka-labs/tiered-cache-example - [ ] web3connection3.block(...) might wait forever. be sure to do it safely - [ ] search for all "todo!" -- [ ] replace all `.context("no servers in sync")` with proper error type - [ ] when using a bunch of slow public servers, i see "no servers in sync" even when things should be right - [ ] i think checking the parents of the heaviest chain works most of the time, but not always - maybe iterate connection heads by total weight? i still think we need to include parent hashes @@ -379,11 +379,14 @@ in another repo: event subscriber - [ ] whats going on here? why is it rolling back? maybe total_difficulty was a LOT higher? - 2022-09-05T19:21:39.763630Z WARN web3_proxy::rpcs::blockchain: chain rolled back 1/6/7 head=15479604 (0xf809…6a2c) rpc=infura_free - i wish i had more logs. its possible that 15479605 came immediatly after -- [ ] ip blocking logs a warn. we don't need that. a stat at most - [ ] keep it working without redis and a database - [ ] web3 on rpc1 exited without errors. maybe promote some shutdown messages from debug to info? - [ ] better handling for offline http servers - if we get a connection refused, we should remove the server's block info so it is taken out of rotation - [ ] web3_proxy_cli command should read database settings from config - [ ] how should we handle reverting transactions? they won't confirm for a while after we send them -- [ ] allow configuration of the expiration time of bearer tokens \ No newline at end of file +- [ ] allow configuration of the expiration time of bearer tokens. currently defaults to 4 weeks +- [ ] instead of putting everything under /rpc, have a site_prefix config? +- [ ] Ulid instead of Uuid for database ids + - might have to use Uuid in sea-orm and then convert to Ulid on display +- [ ] emit stat when an IP/key goes over rate limits diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index dbe5c506..d7951389 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -46,7 +46,7 @@ use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{error, info, trace, warn}; -use uuid::Uuid; +use ulid::Ulid; // TODO: make this customizable? static APP_USER_AGENT: &str = concat!( @@ -103,10 +103,10 @@ pub struct Web3ProxyApp { /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, - pub frontend_key_rate_limiter: Option>, + pub frontend_key_rate_limiter: Option>, pub login_rate_limiter: Option, pub redis_pool: Option, - pub user_cache: Cache, + pub user_cache: Cache, } /// flatten a JoinError into an anyhow error @@ -356,7 +356,7 @@ impl Web3ProxyApp { rpc_rrl.clone(), None, )); - frontend_key_rate_limiter = Some(DeferredRateLimiter::::new( + frontend_key_rate_limiter = Some(DeferredRateLimiter::::new( 10_000, "key", rpc_rrl, None, )); @@ -435,7 +435,7 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] pub async fn eth_subscribe<'a>( self: &'a Arc, - authorization: Arc, + authorized_request: Arc, payload: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now @@ -629,7 +629,7 @@ impl Web3ProxyApp { /// send the request or batch of requests to the approriate RPCs pub async fn proxy_web3_rpc( self: &Arc, - authorization: &Arc, + authorized_request: &Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level @@ -644,14 +644,14 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( timeout( max_time, - self.proxy_web3_rpc_request(authorization, request), + self.proxy_web3_rpc_request(authorized_request, request), ) .await??, ), JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( timeout( max_time, - self.proxy_web3_rpc_requests(authorization, requests), + self.proxy_web3_rpc_requests(authorized_request, requests), ) .await??, ), @@ -667,7 +667,7 @@ impl Web3ProxyApp { /// TODO: make sure this isn't a problem async fn proxy_web3_rpc_requests( self: &Arc, - authorization: &Arc, + authorized_request: &Arc, requests: Vec, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly @@ -675,7 +675,7 @@ impl Web3ProxyApp { let responses = join_all( requests .into_iter() - .map(|request| self.proxy_web3_rpc_request(authorization, request)) + .map(|request| self.proxy_web3_rpc_request(authorized_request, request)) .collect::>(), ) .await; @@ -707,7 +707,7 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] async fn proxy_web3_rpc_request( self: &Arc, - authorization: &Arc, + authorized_request: &Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -841,7 +841,7 @@ impl Web3ProxyApp { let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); return rpcs - .try_send_all_upstream_servers(Some(authorization), request, None) + .try_send_all_upstream_servers(Some(authorized_request), request, None) .await; } "eth_syncing" => { @@ -942,7 +942,7 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_send_best_upstream_server( - Some(authorization), + Some(authorized_request), request, Some(&request_block_id.num), ) diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index 45595f4b..82369f78 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -4,29 +4,30 @@ use entities::{user, user_keys}; use ethers::prelude::Address; use sea_orm::{ActiveModelTrait, TransactionTrait}; use tracing::info; +use ulid::Ulid; use uuid::Uuid; -use web3_proxy::users::new_api_key; +use web3_proxy::frontend::authorization::UserKey; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Create a new user and api key #[argh(subcommand, name = "create_user")] pub struct CreateUserSubCommand { #[argh(option)] - /// the user's ethereum address + /// the user's ethereum address. address: Address, #[argh(option)] - /// the user's optional email + /// the user's optional email. email: Option, - #[argh(option, default = "new_api_key()")] - /// the user's first api key. - /// If none given, one will be generated randomly. - api_key: Uuid, + #[argh(option, default = "UserKey::new()")] + /// the user's first api ULID or UUID key. + /// If none given, one will be created. + api_key: UserKey, #[argh(option)] - /// maximum requests per minute - /// default to "None" which the code sees as "unlimited" requests + /// maximum requests per minute. + /// default to "None" which the code sees as "unlimited" requests. rpm: Option, } @@ -39,6 +40,7 @@ impl CreateUserSubCommand { // TODO: take a simple String. If it starts with 0x, parse as address. otherwise convert ascii to hex let address = self.address.to_fixed_bytes().into(); + // TODO: get existing or create a new one let u = user::ActiveModel { address: sea_orm::Set(address), email: sea_orm::Set(self.email), @@ -57,7 +59,7 @@ impl CreateUserSubCommand { // TODO: requests_per_minute should be configurable let uk = user_keys::ActiveModel { user_id: u.id, - api_key: sea_orm::Set(self.api_key), + api_key: sea_orm::Set(self.api_key.into()), requests_per_minute: sea_orm::Set(self.rpm), ..Default::default() }; @@ -67,7 +69,8 @@ impl CreateUserSubCommand { txn.commit().await?; - info!("user key: {}", uk.api_key.as_ref()); + info!("user key as ULID: {}", Ulid::from(self.api_key)); + info!("user key as UUID: {}", Uuid::from(self.api_key)); Ok(()) } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index e1f75abf..32a92ca3 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -9,11 +9,85 @@ use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use serde::Serialize; -use std::{net::IpAddr, sync::Arc}; +use std::fmt::Display; +use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::time::Instant; use tracing::{error, trace}; +use ulid::Ulid; use uuid::Uuid; +/// This lets us use UUID and ULID while we transition to only ULIDs +#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub enum UserKey { + Ulid(Ulid), + Uuid(Uuid), +} + +impl UserKey { + pub fn new() -> Self { + Ulid::new().into() + } +} + +impl Display for UserKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // TODO: do this without dereferencing + let ulid: Ulid = (*self).into(); + + ulid.fmt(f) + } +} + +impl Default for UserKey { + fn default() -> Self { + Self::new() + } +} + +impl FromStr for UserKey { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + if let Ok(ulid) = s.parse::() { + Ok(ulid.into()) + } else if let Ok(uuid) = s.parse::() { + Ok(uuid.into()) + } else { + Err(anyhow::anyhow!("UserKey was not a ULID or UUID")) + } + } +} + +impl From for UserKey { + fn from(x: Ulid) -> Self { + UserKey::Ulid(x) + } +} + +impl From for UserKey { + fn from(x: Uuid) -> Self { + UserKey::Uuid(x) + } +} + +impl From for Ulid { + fn from(x: UserKey) -> Self { + match x { + UserKey::Ulid(x) => x, + UserKey::Uuid(x) => Ulid::from(x.as_u128()), + } + } +} + +impl From for Uuid { + fn from(x: UserKey) -> Self { + match x { + UserKey::Ulid(x) => Uuid::from_u128(x.0), + UserKey::Uuid(x) => x, + } + } +} + #[derive(Debug)] pub enum RateLimitResult { /// contains the IP of the anonymous user @@ -172,7 +246,15 @@ pub async fn bearer_is_authorized( .context("fetching user key by id")? .context("unknown user id")?; - key_is_authorized(app, user_key_data.api_key, ip, origin, referer, user_agent).await + key_is_authorized( + app, + user_key_data.api_key.into(), + ip, + origin, + referer, + user_agent, + ) + .await } pub async fn ip_is_authorized( @@ -197,7 +279,7 @@ pub async fn ip_is_authorized( pub async fn key_is_authorized( app: &Web3ProxyApp, - user_key: Uuid, + user_key: UserKey, ip: IpAddr, origin: Option, referer: Option, @@ -287,17 +369,19 @@ impl Web3ProxyApp { } // check the local cache for user data, or query the database - pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result { + pub(crate) async fn user_data(&self, user_key: UserKey) -> anyhow::Result { let user_data: Result<_, Arc> = self .user_cache - .try_get_with(user_key, async move { + .try_get_with(user_key.into(), async move { trace!(?user_key, "user_cache miss"); let db = self.db_conn().context("Getting database connection")?; + let user_uuid: Uuid = user_key.into(); + // TODO: join the user table to this to return the User? we don't always need it match user_keys::Entity::find() - .filter(user_keys::Column::ApiKey.eq(user_key)) + .filter(user_keys::Column::ApiKey.eq(user_uuid)) .filter(user_keys::Column::Active.eq(true)) .one(db) .await? @@ -368,7 +452,7 @@ impl Web3ProxyApp { user_data.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference")) } - pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result { + pub async fn rate_limit_by_key(&self, user_key: UserKey) -> anyhow::Result { let user_data = self.user_data(user_key).await?; if user_data.user_key_id == 0 { @@ -383,7 +467,7 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(rate_limiter) = &self.frontend_key_rate_limiter { match rate_limiter - .throttle(user_key, Some(user_max_requests_per_period), 1) + .throttle(user_key.into(), Some(user_max_requests_per_period), 1) .await { Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data)), diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index f9feef5f..5a7c4e37 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -47,12 +47,15 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() let app = Router::new() // routes should be order most to least common .route("/rpc", post(rpc_proxy_http::proxy_web3_rpc)) - .route("/rpc", get(rpc_proxy_ws::public_websocket_handler)) + .route("/rpc", get(rpc_proxy_ws::websocket_handler)) .route( "/rpc/:user_key", post(rpc_proxy_http::proxy_web3_rpc_with_key), ) - .route("/rpc/:user_key", get(rpc_proxy_ws::user_websocket_handler)) + .route( + "/rpc/:user_key", + get(rpc_proxy_ws::websocket_handler_with_key), + ) .route("/rpc/health", get(http::health)) .route("/rpc/status", get(http::status)) // TODO: make this optional or remove it since it is available on another port diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 6cc47ede..f0b3fdf6 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -9,7 +9,6 @@ use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use std::sync::Arc; use tracing::{error_span, Instrument}; -use uuid::Uuid; pub async fn proxy_web3_rpc( Extension(app): Extension>, @@ -22,7 +21,7 @@ pub async fn proxy_web3_rpc( ) -> FrontendResult { let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let authorization = if let Some(TypedHeader(Authorization(bearer))) = bearer { + let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer { let origin = origin.map(|x| x.0); let referer = referer.map(|x| x.0); let user_agent = user_agent.map(|x| x.0); @@ -36,12 +35,12 @@ pub async fn proxy_web3_rpc( .await? }; - let request_span = error_span!("request", ?authorization); + let request_span = error_span!("request", ?authorized_request); - let authorization = Arc::new(authorization); + let authorized_request = Arc::new(authorized_request); let f = tokio::spawn(async move { - app.proxy_web3_rpc(&authorization, payload) + app.proxy_web3_rpc(&authorized_request, payload) .instrument(request_span) .await }); @@ -58,12 +57,14 @@ pub async fn proxy_web3_rpc_with_key( origin: Option>, referer: Option>, user_agent: Option>, - Path(user_key): Path, + Path(user_key): Path, ) -> FrontendResult { + let user_key = user_key.parse()?; + let request_span = error_span!("request", %ip, ?referer, ?user_agent); // TODO: this should probably return the user_key_id instead? or maybe both? - let authorization = key_is_authorized( + let authorized_request = key_is_authorized( &app, user_key, ip, @@ -74,12 +75,12 @@ pub async fn proxy_web3_rpc_with_key( .instrument(request_span.clone()) .await?; - let request_span = error_span!("request", ?authorization); + let request_span = error_span!("request", ?authorized_request); - let authorization = Arc::new(authorization); + let authorized_request = Arc::new(authorized_request); let f = tokio::spawn(async move { - app.proxy_web3_rpc(&authorization, payload) + app.proxy_web3_rpc(&authorized_request, payload) .instrument(request_span) .await }); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index a6004494..b048cabd 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -1,6 +1,8 @@ -use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest}; +use super::authorization::{ + bearer_is_authorized, ip_is_authorized, key_is_authorized, AuthorizedRequest, +}; use super::errors::FrontendResult; -use axum::headers::{Origin, Referer, UserAgent}; +use axum::headers::{authorization::Bearer, Authorization, Origin, Referer, UserAgent}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::Path, @@ -20,7 +22,6 @@ use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use tracing::{error, error_span, info, trace, Instrument}; -use uuid::Uuid; use crate::{ app::Web3ProxyApp, @@ -28,21 +29,39 @@ use crate::{ }; #[debug_handler] -pub async fn public_websocket_handler( +pub async fn websocket_handler( + bearer: Option>>, Extension(app): Extension>, ClientIp(ip): ClientIp, + origin: Option>, + referer: Option>, + user_agent: Option>, ws_upgrade: Option, ) -> FrontendResult { - let authorization = ip_is_authorized(&app, ip).await?; + let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let request_span = error_span!("request", ?authorization); + let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer { + let origin = origin.map(|x| x.0); + let referer = referer.map(|x| x.0); + let user_agent = user_agent.map(|x| x.0); - let authorization = Arc::new(authorization); + bearer_is_authorized(&app, bearer, ip, origin, referer, user_agent) + .instrument(request_span.clone()) + .await? + } else { + ip_is_authorized(&app, ip) + .instrument(request_span.clone()) + .await? + }; + + let request_span = error_span!("request", ?authorized_request); + + let authorized_request = Arc::new(authorized_request); match ws_upgrade { Some(ws) => Ok(ws .on_upgrade(|socket| { - proxy_web3_socket(app, authorization, socket).instrument(request_span) + proxy_web3_socket(app, authorized_request, socket).instrument(request_span) }) .into_response()), None => { @@ -53,16 +72,20 @@ pub async fn public_websocket_handler( } #[debug_handler] -pub async fn user_websocket_handler( +pub async fn websocket_handler_with_key( Extension(app): Extension>, ClientIp(ip): ClientIp, - Path(user_key): Path, + Path(user_key): Path, origin: Option>, referer: Option>, user_agent: Option>, ws_upgrade: Option, ) -> FrontendResult { - let authorization = key_is_authorized( + let user_key = user_key.parse()?; + + let request_span = error_span!("request", %ip, ?referer, ?user_agent); + + let authorized_request = key_is_authorized( &app, user_key, ip, @@ -70,16 +93,17 @@ pub async fn user_websocket_handler( referer.map(|x| x.0), user_agent.map(|x| x.0), ) + .instrument(request_span.clone()) .await?; // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info - let request_span = error_span!("request", ?authorization); + let request_span = error_span!("request", ?authorized_request); - let authorization = Arc::new(authorization); + let authorized_request = Arc::new(authorized_request); match ws_upgrade { Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { - proxy_web3_socket(app, authorization, socket).instrument(request_span) + proxy_web3_socket(app, authorized_request, socket).instrument(request_span) })), None => { // TODO: store this on the app and use register_template? @@ -90,7 +114,7 @@ pub async fn user_websocket_handler( let user_url = reg .render_template( &app.config.redirect_user_url, - &json!({ "authorization": authorization }), + &json!({ "authorized_request": authorized_request }), ) .unwrap(); @@ -102,7 +126,7 @@ pub async fn user_websocket_handler( async fn proxy_web3_socket( app: Arc, - authorization: Arc, + authorized_request: Arc, socket: WebSocket, ) { // split the websocket so we can read and write concurrently @@ -112,13 +136,18 @@ async fn proxy_web3_socket( let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); - tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); + tokio::spawn(read_web3_socket( + app, + authorized_request, + ws_rx, + response_sender, + )); } /// websockets support a few more methods than http clients async fn handle_socket_payload( app: Arc, - authorization: Arc, + authorized_request: Arc, payload: &str, response_sender: &flume::Sender, subscription_count: &AtomicUsize, @@ -137,7 +166,7 @@ async fn handle_socket_payload( let response = app .eth_subscribe( - authorization.clone(), + authorized_request.clone(), payload, subscription_count, response_sender.clone(), @@ -174,7 +203,10 @@ async fn handle_socket_payload( Ok(response.into()) } - _ => app.proxy_web3_rpc(&authorization, payload.into()).await, + _ => { + app.proxy_web3_rpc(&authorized_request, payload.into()) + .await + } }; (id, response) @@ -200,7 +232,7 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, - authorization: Arc, + authorized_request: Arc, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { @@ -213,7 +245,7 @@ async fn read_web3_socket( Message::Text(payload) => { handle_socket_payload( app.clone(), - authorization.clone(), + authorized_request.clone(), &payload, &response_sender, &subscription_count, @@ -236,7 +268,7 @@ async fn read_web3_socket( handle_socket_payload( app.clone(), - authorization.clone(), + authorized_request.clone(), payload, &response_sender, &subscription_count, diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 9725b3a1..49b294a6 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -7,9 +7,9 @@ // I wonder how we handle payment // probably have to do manual withdrawals -use super::authorization::login_is_authorized; +use super::authorization::{login_is_authorized, UserKey}; use super::errors::FrontendResult; -use crate::{app::Web3ProxyApp, users::new_api_key}; +use crate::app::Web3ProxyApp; use anyhow::Context; use axum::{ extract::{Path, Query}, @@ -31,7 +31,6 @@ use std::ops::Add; use std::sync::Arc; use time::{Duration, OffsetDateTime}; use ulid::Ulid; -use uuid::Uuid; // TODO: how do we customize axum's error response? I think we probably want an enum that implements IntoResponse instead #[debug_handler] @@ -134,7 +133,7 @@ pub struct PostLogin { #[derive(Serialize)] pub struct PostLoginResponse { bearer_token: Ulid, - api_keys: Vec, + api_keys: Vec, } /// Post to the user endpoint to register or login. @@ -197,9 +196,11 @@ pub async fn post_login( let u = u.insert(&txn).await?; + let user_key = UserKey::new(); + let uk = user_keys::ActiveModel { user_id: sea_orm::Set(u.id), - api_key: sea_orm::Set(new_api_key()), + api_key: sea_orm::Set(user_key.into()), requests_per_minute: sea_orm::Set(app.config.default_requests_per_minute), ..Default::default() }; @@ -216,7 +217,7 @@ pub async fn post_login( let response_json = PostLoginResponse { bearer_token, - api_keys: uks.iter().map(|uk| uk.api_key).collect(), + api_keys: uks.iter().map(|uk| uk.api_key.into()).collect(), }; let response = (StatusCode::CREATED, Json(response_json)).into_response(); @@ -233,7 +234,7 @@ pub async fn post_login( let response_json = PostLoginResponse { bearer_token, - api_keys: uks.iter().map(|uk| uk.api_key).collect(), + api_keys: uks.iter().map(|uk| uk.api_key.into()).collect(), }; let response = (StatusCode::OK, Json(response_json)).into_response(); diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 8a0f6a93..3b78fc7f 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -6,4 +6,3 @@ pub mod jsonrpc; pub mod metered; pub mod metrics_frontend; pub mod rpcs; -pub mod users; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index af066496..171bd2fe 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -91,7 +91,7 @@ impl Web3Connections { /// Will query a specific node or the best available. pub async fn block( &self, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, hash: &H256, rpc: Option<&Arc>, ) -> anyhow::Result { @@ -106,7 +106,7 @@ impl Web3Connections { // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { - rpc.wait_for_request_handle(authorization, Duration::from_secs(30)) + rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30)) .await? .request( "eth_getBlockByHash", @@ -122,7 +122,7 @@ impl Web3Connections { let request: JsonRpcRequest = serde_json::from_value(request)?; let response = self - .try_send_best_upstream_server(authorization, request, None) + .try_send_best_upstream_server(authorized_request, request, None) .await?; let block = response.result.unwrap(); @@ -171,7 +171,7 @@ impl Web3Connections { // deref to not keep the lock open if let Some(block_hash) = self.block_numbers.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set - // TODO: pass authorization through here? + // TODO: pass authorized_request through here? return self.block(None, &block_hash, None).await; } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 7cdb1fa5..76c8222a 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -742,13 +742,13 @@ impl Web3Connection { #[instrument] pub async fn wait_for_request_handle( self: &Arc, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, max_wait: Duration, ) -> anyhow::Result { let max_wait = Instant::now() + max_wait; loop { - let x = self.try_request_handle(authorization).await; + let x = self.try_request_handle(authorized_request).await; trace!(?x, "try_request_handle"); @@ -778,7 +778,7 @@ impl Web3Connection { #[instrument] pub async fn try_request_handle( self: &Arc, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, ) -> anyhow::Result { // check that we are connected if !self.has_provider().await { @@ -809,7 +809,7 @@ impl Web3Connection { } }; - let handle = OpenRequestHandle::new(self.clone(), authorization.cloned()); + let handle = OpenRequestHandle::new(self.clone(), authorized_request.cloned()); Ok(OpenRequestResult::Handle(handle)) } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 193792d6..615821d2 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -368,7 +368,7 @@ impl Web3Connections { /// get the best available rpc server pub async fn next_upstream_server( &self, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -427,7 +427,7 @@ impl Web3Connections { // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in synced_rpcs.into_iter() { // increment our connection counter - match rpc.try_request_handle(authorization).await { + match rpc.try_request_handle(authorized_request).await { Ok(OpenRequestResult::Handle(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -458,7 +458,7 @@ impl Web3Connections { // TODO: better type on this that can return an anyhow::Result pub async fn upstream_servers( &self, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, block_needed: Option<&U64>, ) -> Result, Option> { let mut earliest_retry_at = None; @@ -473,7 +473,7 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_request_handle(authorization).await { + match connection.try_request_handle(authorized_request).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); @@ -499,7 +499,7 @@ impl Web3Connections { /// be sure there is a timeout on this or it might loop forever pub async fn try_send_best_upstream_server( &self, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, request: JsonRpcRequest, min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -511,7 +511,7 @@ impl Web3Connections { break; } match self - .next_upstream_server(authorization, &skip_rpcs, min_block_needed) + .next_upstream_server(authorized_request, &skip_rpcs, min_block_needed) .await? { OpenRequestResult::Handle(active_request_handle) => { @@ -601,12 +601,15 @@ impl Web3Connections { #[instrument] pub async fn try_send_all_upstream_servers( &self, - authorization: Option<&Arc>, + authorized_request: Option<&Arc>, request: JsonRpcRequest, block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self.upstream_servers(authorization, block_needed).await { + match self + .upstream_servers(authorized_request, block_needed) + .await + { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index cdd47a01..af6363c0 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -33,7 +33,7 @@ pub enum OpenRequestResult { /// Make RPC requests through this handle and drop it when you are done. #[derive(Debug)] pub struct OpenRequestHandle { - authorization: Arc, + authorized_request: Arc, conn: Arc, // TODO: this is the same metrics on the conn. use a reference? metrics: Arc, @@ -104,7 +104,10 @@ impl AuthorizedRequest { #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { - pub fn new(conn: Arc, authorization: Option>) -> Self { + pub fn new( + conn: Arc, + authorized_request: Option>, + ) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! @@ -119,13 +122,13 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); let used = false.into(); - let authorization = authorization.unwrap_or_else(|| { + let authorized_request = authorized_request.unwrap_or_else(|| { let db_conn = conn.db_conn.clone(); Arc::new(AuthorizedRequest::Internal(db_conn)) }); Self { - authorization, + authorized_request, conn, metrics, used, @@ -161,7 +164,7 @@ impl OpenRequestHandle { // TODO: use tracing spans // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose - // the authorization field is already on a parent span + // the authorized_request field is already on a parent span trace!(rpc=%self.conn, %method, "request"); let mut provider = None; @@ -193,7 +196,7 @@ impl OpenRequestHandle { let error_handler = if let RequestErrorHandler::SaveReverts(save_chance) = error_handler { if ["eth_call", "eth_estimateGas"].contains(&method) - && self.authorization.db_conn().is_some() + && self.authorized_request.db_conn().is_some() && save_chance != 0.0 && (save_chance == 1.0 || rand::thread_rng().gen_range(0.0..=1.0) <= save_chance) @@ -249,7 +252,7 @@ impl OpenRequestHandle { .expect("parsing eth_call"); // spawn saving to the database so we don't slow down the request (or error if no db) - let f = self.authorization.clone().save_revert(params); + let f = self.authorized_request.clone().save_revert(params); tokio::spawn(async move { f.await }); } else { diff --git a/web3_proxy/src/users.rs b/web3_proxy/src/users.rs deleted file mode 100644 index c54ea1c3..00000000 --- a/web3_proxy/src/users.rs +++ /dev/null @@ -1,11 +0,0 @@ -use rand::prelude::*; -use uuid::{Builder, Uuid}; - -pub fn new_api_key() -> Uuid { - // TODO: chacha20? - let mut rng = thread_rng(); - - let random_bytes = rng.gen(); - - Builder::from_random_bytes(random_bytes).into_uuid() -}