diff --git a/TODO.md b/TODO.md index 18585ce5..1bffa5e9 100644 --- a/TODO.md +++ b/TODO.md @@ -199,4 +199,5 @@ in another repo: event subscriber 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 - [ ] threshold should check actual available request limits (if any) instead of just the soft limit - [ ] foreign key on_update and on_delete -- [ ] database creation timestamps \ No newline at end of file +- [ ] database creation timestamps +- [ ] better error handling. we warn too often for validation errors and use the same error code for most every request \ No newline at end of file diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 1cc0e263..4437815e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -8,7 +8,7 @@ use web3_proxy::app::get_migrated_db; #[derive(Debug, FromArgs)] /// Command line interface for admins to interact with web3-proxy pub struct TopConfig { - /// what host the client should connect to + /// what database the client should connect to #[argh( option, default = "\"mysql://root:dev_web3_proxy@127.0.0.1:3306/dev_web3_proxy\".to_string()" diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 9f42e23f..b0be195d 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -819,7 +819,7 @@ impl Web3Connections { if new_head_block { self.chain.add_block(new_block.clone(), true); - info!( + debug!( "{}/{} rpcs at {} ({}). head at {:?}", pending_synced_connections.conns.len(), self.conns.len(), @@ -847,7 +847,7 @@ impl Web3Connections { // TODO: mark any orphaned transactions as unconfirmed } } else if num_best_rpcs == self.conns.len() { - debug!( + trace!( "all {} rpcs at {} ({})", num_best_rpcs, pending_synced_connections.head_block_hash, diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index ec40130c..055fa768 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -1,6 +1,8 @@ +use axum::extract::Path; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use std::sync::Arc; +use uuid::Uuid; use super::errors::handle_anyhow_error; use super::{rate_limit_by_ip, rate_limit_by_key}; @@ -24,9 +26,9 @@ pub async fn public_proxy_web3_rpc( pub async fn user_proxy_web3_rpc( Json(payload): Json, Extension(app): Extension>, - key: String, + Path(user_key): Path, ) -> impl IntoResponse { - if let Err(x) = rate_limit_by_key(&app, &key).await { + if let Err(x) = rate_limit_by_key(&app, user_key).await { return x.into_response(); } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index b72d43e6..678c39a2 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -17,6 +17,7 @@ use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tracing::debug; +use uuid::Uuid; use crate::app::Web3ProxyApp; @@ -25,56 +26,11 @@ use self::errors::handle_anyhow_error; pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> { let rate_limiter_key = format!("ip:{}", ip); - rate_limit_by_key(app, &rate_limiter_key).await -} - -/// if Ok(()), rate limits are acceptable -/// if Err(response), rate limits exceeded -pub async fn rate_limit_by_key( - app: &Web3ProxyApp, - user_key: &str, -) -> Result<(), impl IntoResponse> { - let db = app.db_conn(); - - // query the db to make sure this key is active - // TODO: probably want a cache on this - match user_keys::Entity::find() - .select_only() - .column(user_keys::Column::UserId) - .filter(user_keys::Column::ApiKey.eq(user_key)) - .filter(user_keys::Column::Active.eq(true)) - .one(db) - .await - { - Ok(Some(_)) => { - // user key is valid - } - Ok(None) => { - // invalid user key - // TODO: rate limit by ip here, too? maybe tarpit? - return Err(handle_anyhow_error( - Some(StatusCode::FORBIDDEN), - None, - anyhow::anyhow!("unknown api key"), - ) - .await - .into_response()); - } - Err(e) => { - return Err(handle_anyhow_error( - Some(StatusCode::INTERNAL_SERVER_ERROR), - None, - e.into(), - ) - .await - .into_response()); - } - } - + // TODO: dry this up with rate_limit_by_key if let Some(rate_limiter) = app.rate_limiter() { - if rate_limiter.throttle_key(user_key).await.is_err() { + if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() { // TODO: set headers so they know when they can retry - // warn!(?ip, "public rate limit exceeded"); + // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good // TODO: use their id if possible return Err(handle_anyhow_error( Some(StatusCode::TOO_MANY_REQUESTS), @@ -91,14 +47,82 @@ pub async fn rate_limit_by_key( Ok(()) } +/// if Ok(()), rate limits are acceptable +/// if Err(response), rate limits exceeded +pub async fn rate_limit_by_key( + app: &Web3ProxyApp, + user_key: Uuid, +) -> Result<(), impl IntoResponse> { + let db = app.db_conn(); + + // query the db to make sure this key is active + // TODO: probably want a cache on this + match user_keys::Entity::find() + // .select_only() + // .column(user_keys::Column::UserId) + .filter(user_keys::Column::ApiKey.eq(user_key)) + .filter(user_keys::Column::Active.eq(true)) + .one(db) + .await + { + Ok(Some(_)) => { + // user key is valid + if let Some(rate_limiter) = app.rate_limiter() { + if rate_limiter + .throttle_key(&user_key.to_string()) + .await + .is_err() + { + // TODO: set headers so they know when they can retry + // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good + // TODO: use their id if possible + return Err(handle_anyhow_error( + Some(StatusCode::TOO_MANY_REQUESTS), + None, + anyhow::anyhow!("too many requests"), + ) + .await + .into_response()); + } + } else { + // TODO: if no redis, rate limit with a local cache? + } + } + Ok(None) => { + // invalid user key + // TODO: rate limit by ip here, too? maybe tarpit? + return Err(handle_anyhow_error( + Some(StatusCode::FORBIDDEN), + None, + anyhow::anyhow!("unknown api key"), + ) + .await + .into_response()); + } + Err(err) => { + let err: anyhow::Error = err.into(); + + return Err(handle_anyhow_error( + Some(StatusCode::INTERNAL_SERVER_ERROR), + None, + err.context("failed checking database for user key"), + ) + .await + .into_response()); + } + } + + Ok(()) +} + pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> { // build our application with a route // order most to least common let app = Router::new() .route("/", post(http_proxy::public_proxy_web3_rpc)) .route("/", get(ws_proxy::public_websocket_handler)) - .route("/u/:key", post(http_proxy::user_proxy_web3_rpc)) - .route("/u/:key", get(ws_proxy::user_websocket_handler)) + .route("/u/:user_key", post(http_proxy::user_proxy_web3_rpc)) + .route("/u/:user_key", get(ws_proxy::user_websocket_handler)) .route("/health", get(http::health)) .route("/status", get(http::status)) .route("/users", post(users::create_user)) @@ -111,7 +135,7 @@ pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> // `axum::Server` is a re-export of `hyper::Server` // TODO: allow only listening on localhost? let addr = SocketAddr::from(([0, 0, 0, 0], port)); - debug!("listening on port {}", port); + info!("listening on port {}", port); // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? axum::Server::bind(&addr) .serve(app.into_make_service_with_connect_info::()) diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index 9894b4c1..03a1d803 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -1,5 +1,6 @@ use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, + extract::Path, response::IntoResponse, Extension, }; @@ -14,6 +15,7 @@ use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use tracing::{error, info, trace}; +use uuid::Uuid; use crate::{ app::Web3ProxyApp, @@ -37,9 +39,9 @@ pub async fn public_websocket_handler( pub async fn user_websocket_handler( Extension(app): Extension>, ws: WebSocketUpgrade, - key: String, + Path(user_key): Path, ) -> impl IntoResponse { - if let Err(x) = rate_limit_by_key(&app, &key).await { + if let Err(x) = rate_limit_by_key(&app, user_key).await { return x.into_response(); } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 59c18ddf..f5b1a7b8 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -167,6 +167,7 @@ impl JsonRpcForwardedResponse { pub fn from_anyhow_error(err: anyhow::Error, id: Box) -> Self { let err = format!("{:?}", err); + // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that warn!("forwarding error. {:?}", err); JsonRpcForwardedResponse {