diff --git a/TODO.md b/TODO.md index 773f28d8..df1bbb51 100644 --- a/TODO.md +++ b/TODO.md @@ -60,10 +60,11 @@ - [x] synced connections swap threshold set to 1 so that it always serves something - [x] cli tool for creating new users - [x] incoming rate limiting by api key -- [ ] after a refactor, public rate limit isnt working anymore. i set to 0 but could still request -- [ ] give users different rate limits looked up from the database -- [ ] basic request method stats +- [x] sort forked blocks by total difficulty like geth does +- [x] refactor result type on active handlers to use a cleaner success/error so we can use the try operator +- [x] give users different rate limits looked up from the database - [ ] allow blocking public requests +- [ ] basic request method stats ## V1 diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index a867cd7e..f22d177b 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -75,7 +75,7 @@ fn run( let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app)); // if everything is working, these should both run forever - // TODO: select on the shutdown marker, here? + // TODO: try_join these instead? use signal_shutdown here? tokio::select! { x = app_handle => { match x { @@ -94,7 +94,8 @@ fn run( } } _ = shutdown_receiver.recv_async() => { - info!("shutdown signal"); + // TODO: think more about this. we need some way for tests to tell the app to stop + info!("received shutdown signal"); // TODO: wait for outstanding requests to complete. graceful shutdown will make our users happier diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 77536935..21a87702 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -4,12 +4,13 @@ use arc_swap::ArcSwap; use counter::Counter; use dashmap::DashMap; use derive_more::From; -use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64}; +use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U256, U64}; use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; use indexmap::{IndexMap, IndexSet}; +use std::cmp::Reverse; // use parking_lot::RwLock; // use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; @@ -729,20 +730,32 @@ impl Web3Connections { impl<'a> State<'a> { // TODO: there are sortable traits, but this seems simpler - fn sortable_values(&self) -> (&U64, &u32, usize, &H256) { + /// sort the blocks in descending height + fn sortable_values(&self) -> Reverse<(&U64, &u32, &U256, &H256)> { + trace!(?self.block, ?self.conns); + + // first we care about the block number let block_num = self.block.number.as_ref().unwrap(); + // if block_num ties, the block with the highest total difficulty *should* be the winner + // TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything + // let total_difficulty = self.block.total_difficulty.as_ref().expect("wat"); + + // all the nodes should already be doing this fork priority logic themselves + // so, it should be safe to just look at whatever our node majority thinks and go with that let sum_soft_limit = &self.sum_soft_limit; - let conns = self.conns.len(); + let difficulty = &self.block.difficulty; + // if we are still tied (unlikely). this will definitely break the tie + // TODO: what does geth do? let block_hash = self.block.hash.as_ref().unwrap(); - (block_num, sum_soft_limit, conns, block_hash) + Reverse((block_num, sum_soft_limit, difficulty, block_hash)) } } - // TODO: i'm always getting None + // TODO: this needs tests if let Some(x) = rpcs_by_hash .into_iter() .filter_map(|(hash, conns)| { diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index 055fa768..325e5cdc 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use uuid::Uuid; use super::errors::handle_anyhow_error; -use super::{rate_limit_by_ip, rate_limit_by_key}; +use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; pub async fn public_proxy_web3_rpc( diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 5ee87b95..f9767bdf 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -2,161 +2,21 @@ mod errors; mod http; mod http_proxy; +mod rate_limit; mod users; mod ws_proxy; use axum::{ handler::Handler, - response::IntoResponse, routing::{get, post}, Extension, Router, }; -use entities::user_keys; -use redis_cell_client::ThrottleResult; -use reqwest::StatusCode; -use sea_orm::{ - ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, -}; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; -use tracing::{debug, info}; -use uuid::Uuid; +use tracing::info; use crate::app::Web3ProxyApp; -use self::errors::handle_anyhow_error; - -pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> { - let rate_limiter_key = format!("ip-{}", ip); - - // TODO: dry this up with rate_limit_by_key - if let Some(rate_limiter) = app.rate_limiter() { - match rate_limiter - .throttle_key(&rate_limiter_key, None, None, None) - .await - { - Ok(ThrottleResult::Allowed) => {} - Ok(ThrottleResult::RetryAt(_retry_at)) => { - // TODO: set headers so they know when they can retry - debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible - return Err(handle_anyhow_error( - Some(StatusCode::TOO_MANY_REQUESTS), - None, - anyhow::anyhow!(format!("too many requests from this ip: {}", ip)), - ) - .await - .into_response()); - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - return Err(handle_anyhow_error( - Some(StatusCode::INTERNAL_SERVER_ERROR), - None, - anyhow::anyhow!(format!("too many requests from this ip: {}", ip)), - ) - .await - .into_response()); - } - } - } else { - // TODO: if no redis, rate limit with a local cache? - } - - Ok(()) -} - -/// if Ok(()), rate limits are acceptable -/// if Err(response), rate limits exceeded -pub async fn rate_limit_by_key( - app: &Web3ProxyApp, - user_key: Uuid, -) -> Result<(), impl IntoResponse> { - let db = app.db_conn(); - - /// query just a few columns instead of the entire table - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - UserId, - RequestsPerMinute, - } - - // query the db to make sure this key is active - // TODO: probably want a cache on this - match user_keys::Entity::find() - .select_only() - .column_as(user_keys::Column::UserId, QueryAs::UserId) - .column_as( - user_keys::Column::RequestsPerMinute, - QueryAs::RequestsPerMinute, - ) - .filter(user_keys::Column::ApiKey.eq(user_key)) - .filter(user_keys::Column::Active.eq(true)) - .into_values::<_, QueryAs>() - .one(db) - .await - { - Ok::, _>(Some((_user_id, user_count_per_period))) => { - // user key is valid - if let Some(rate_limiter) = app.rate_limiter() { - // TODO: how does max burst actually work? what should it be? - let user_max_burst = user_count_per_period / 3; - let user_period = 60; - - if rate_limiter - .throttle_key( - &user_key.to_string(), - Some(user_max_burst), - Some(user_count_per_period), - Some(user_period), - ) - .await - .is_err() - { - // TODO: set headers so they know when they can retry - // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible - return Err(handle_anyhow_error( - Some(StatusCode::TOO_MANY_REQUESTS), - None, - // TODO: include the user id (NOT THE API KEY!) here - anyhow::anyhow!("too many requests from this key"), - ) - .await - .into_response()); - } - } else { - // TODO: if no redis, rate limit with a local cache? - } - } - Ok(None) => { - // invalid user key - // TODO: rate limit by ip here, too? maybe tarpit? - return Err(handle_anyhow_error( - Some(StatusCode::FORBIDDEN), - None, - anyhow::anyhow!("unknown api key"), - ) - .await - .into_response()); - } - Err(err) => { - let err: anyhow::Error = err.into(); - - return Err(handle_anyhow_error( - Some(StatusCode::INTERNAL_SERVER_ERROR), - None, - err.context("failed checking database for user key"), - ) - .await - .into_response()); - } - } - - Ok(()) -} - pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> { // build our application with a route // order most to least common diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs new file mode 100644 index 00000000..41e3f8c0 --- /dev/null +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -0,0 +1,146 @@ +use axum::response::IntoResponse; +use entities::user_keys; +use redis_cell_client::ThrottleResult; +use reqwest::StatusCode; +use sea_orm::{ + ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, +}; +use std::net::IpAddr; +use tracing::{debug, warn}; +use uuid::Uuid; + +use crate::app::Web3ProxyApp; + +use super::errors::handle_anyhow_error; + +pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> { + let rate_limiter_key = format!("ip-{}", ip); + + // TODO: dry this up with rate_limit_by_key + if let Some(rate_limiter) = app.rate_limiter() { + match rate_limiter + .throttle_key(&rate_limiter_key, None, None, None) + .await + { + Ok(ThrottleResult::Allowed) => {} + Ok(ThrottleResult::RetryAt(_retry_at)) => { + // TODO: set headers so they know when they can retry + debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good + // TODO: use their id if possible + return Err(handle_anyhow_error( + Some(StatusCode::TOO_MANY_REQUESTS), + None, + anyhow::anyhow!(format!("too many requests from this ip: {}", ip)), + ) + .await + .into_response()); + } + Err(err) => { + // internal error, not rate limit being hit + // TODO: i really want axum to do this for us in a single place. + return Err(handle_anyhow_error( + Some(StatusCode::INTERNAL_SERVER_ERROR), + None, + err, + ) + .await + .into_response()); + } + } + } else { + // TODO: if no redis, rate limit with a local cache? + warn!("no rate limiter!"); + } + + Ok(()) +} + +/// if Ok(()), rate limits are acceptable +/// if Err(response), rate limits exceeded +pub async fn rate_limit_by_key( + app: &Web3ProxyApp, + user_key: Uuid, +) -> Result<(), impl IntoResponse> { + let db = app.db_conn(); + + /// query just a few columns instead of the entire table + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + UserId, + RequestsPerMinute, + } + + // query the db to make sure this key is active + // TODO: probably want a cache on this + match user_keys::Entity::find() + .select_only() + .column_as(user_keys::Column::UserId, QueryAs::UserId) + .column_as( + user_keys::Column::RequestsPerMinute, + QueryAs::RequestsPerMinute, + ) + .filter(user_keys::Column::ApiKey.eq(user_key)) + .filter(user_keys::Column::Active.eq(true)) + .into_values::<_, QueryAs>() + .one(db) + .await + { + Ok::, _>(Some((_user_id, user_count_per_period))) => { + // user key is valid + if let Some(rate_limiter) = app.rate_limiter() { + // TODO: how does max burst actually work? what should it be? + let user_max_burst = user_count_per_period / 3; + let user_period = 60; + + if rate_limiter + .throttle_key( + &user_key.to_string(), + Some(user_max_burst), + Some(user_count_per_period), + Some(user_period), + ) + .await + .is_err() + { + // TODO: set headers so they know when they can retry + // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good + // TODO: use their id if possible + return Err(handle_anyhow_error( + Some(StatusCode::TOO_MANY_REQUESTS), + None, + // TODO: include the user id (NOT THE API KEY!) here + anyhow::anyhow!("too many requests from this key"), + ) + .await + .into_response()); + } + } else { + // TODO: if no redis, rate limit with a local cache? + } + } + Ok(None) => { + // invalid user key + // TODO: rate limit by ip here, too? maybe tarpit? + return Err(handle_anyhow_error( + Some(StatusCode::FORBIDDEN), + None, + anyhow::anyhow!("unknown api key"), + ) + .await + .into_response()); + } + Err(err) => { + let err: anyhow::Error = err.into(); + + return Err(handle_anyhow_error( + Some(StatusCode::INTERNAL_SERVER_ERROR), + None, + err.context("failed checking database for user key"), + ) + .await + .into_response()); + } + } + + Ok(()) +} diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 95b4af2e..c957c21d 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -15,7 +15,7 @@ use sea_orm::ActiveModelTrait; use serde::Deserialize; use std::sync::Arc; -use crate::{app::Web3ProxyApp, frontend::rate_limit_by_ip}; +use crate::{app::Web3ProxyApp, frontend::rate_limit::rate_limit_by_ip}; pub async fn create_user( // this argument tells axum to parse the request body diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index 8115762f..32638acf 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -22,7 +22,7 @@ use crate::{ jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, }; -use super::{rate_limit_by_ip, rate_limit_by_key}; +use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key}; pub async fn public_websocket_handler( Extension(app): Extension>,