sorting on total difficulty doesnt work with geth websocket

This commit is contained in:
Bryan Stitt 2022-08-07 20:44:56 +00:00
parent 929593227c
commit 31a6efb5f2
8 changed files with 177 additions and 156 deletions

@ -60,10 +60,11 @@
- [x] synced connections swap threshold set to 1 so that it always serves something
- [x] cli tool for creating new users
- [x] incoming rate limiting by api key
- [ ] after a refactor, public rate limit isnt working anymore. i set to 0 but could still request
- [ ] give users different rate limits looked up from the database
- [ ] basic request method stats
- [x] sort forked blocks by total difficulty like geth does
- [x] refactor result type on active handlers to use a cleaner success/error so we can use the try operator
- [x] give users different rate limits looked up from the database
- [ ] allow blocking public requests
- [ ] basic request method stats
## V1

@ -75,7 +75,7 @@ fn run(
let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app));
// if everything is working, these should both run forever
// TODO: select on the shutdown marker, here?
// TODO: try_join these instead? use signal_shutdown here?
tokio::select! {
x = app_handle => {
match x {
@ -94,7 +94,8 @@ fn run(
}
}
_ = shutdown_receiver.recv_async() => {
info!("shutdown signal");
// TODO: think more about this. we need some way for tests to tell the app to stop
info!("received shutdown signal");
// TODO: wait for outstanding requests to complete. graceful shutdown will make our users happier

@ -4,12 +4,13 @@ use arc_swap::ArcSwap;
use counter::Counter;
use dashmap::DashMap;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64};
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U256, U64};
use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use indexmap::{IndexMap, IndexSet};
use std::cmp::Reverse;
// use parking_lot::RwLock;
// use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer};
@ -729,20 +730,32 @@ impl Web3Connections {
impl<'a> State<'a> {
// TODO: there are sortable traits, but this seems simpler
fn sortable_values(&self) -> (&U64, &u32, usize, &H256) {
/// sort the blocks in descending height
fn sortable_values(&self) -> Reverse<(&U64, &u32, &U256, &H256)> {
trace!(?self.block, ?self.conns);
// first we care about the block number
let block_num = self.block.number.as_ref().unwrap();
// if block_num ties, the block with the highest total difficulty *should* be the winner
// TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything
// let total_difficulty = self.block.total_difficulty.as_ref().expect("wat");
// all the nodes should already be doing this fork priority logic themselves
// so, it should be safe to just look at whatever our node majority thinks and go with that
let sum_soft_limit = &self.sum_soft_limit;
let conns = self.conns.len();
let difficulty = &self.block.difficulty;
// if we are still tied (unlikely). this will definitely break the tie
// TODO: what does geth do?
let block_hash = self.block.hash.as_ref().unwrap();
(block_num, sum_soft_limit, conns, block_hash)
Reverse((block_num, sum_soft_limit, difficulty, block_hash))
}
}
// TODO: i'm always getting None
// TODO: this needs tests
if let Some(x) = rpcs_by_hash
.into_iter()
.filter_map(|(hash, conns)| {

@ -5,7 +5,7 @@ use std::sync::Arc;
use uuid::Uuid;
use super::errors::handle_anyhow_error;
use super::{rate_limit_by_ip, rate_limit_by_key};
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key};
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
pub async fn public_proxy_web3_rpc(

@ -2,161 +2,21 @@
mod errors;
mod http;
mod http_proxy;
mod rate_limit;
mod users;
mod ws_proxy;
use axum::{
handler::Handler,
response::IntoResponse,
routing::{get, post},
Extension, Router,
};
use entities::user_keys;
use redis_cell_client::ThrottleResult;
use reqwest::StatusCode;
use sea_orm::{
ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect,
};
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{debug, info};
use uuid::Uuid;
use tracing::info;
use crate::app::Web3ProxyApp;
use self::errors::handle_anyhow_error;
pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> {
let rate_limiter_key = format!("ip-{}", ip);
// TODO: dry this up with rate_limit_by_key
if let Some(rate_limiter) = app.rate_limiter() {
match rate_limiter
.throttle_key(&rate_limiter_key, None, None, None)
.await
{
Ok(ThrottleResult::Allowed) => {}
Ok(ThrottleResult::RetryAt(_retry_at)) => {
// TODO: set headers so they know when they can retry
debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good
// TODO: use their id if possible
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
anyhow::anyhow!(format!("too many requests from this ip: {}", ip)),
)
.await
.into_response());
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
return Err(handle_anyhow_error(
Some(StatusCode::INTERNAL_SERVER_ERROR),
None,
anyhow::anyhow!(format!("too many requests from this ip: {}", ip)),
)
.await
.into_response());
}
}
} else {
// TODO: if no redis, rate limit with a local cache?
}
Ok(())
}
/// if Ok(()), rate limits are acceptable
/// if Err(response), rate limits exceeded
pub async fn rate_limit_by_key(
app: &Web3ProxyApp,
user_key: Uuid,
) -> Result<(), impl IntoResponse> {
let db = app.db_conn();
/// query just a few columns instead of the entire table
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
UserId,
RequestsPerMinute,
}
// query the db to make sure this key is active
// TODO: probably want a cache on this
match user_keys::Entity::find()
.select_only()
.column_as(user_keys::Column::UserId, QueryAs::UserId)
.column_as(
user_keys::Column::RequestsPerMinute,
QueryAs::RequestsPerMinute,
)
.filter(user_keys::Column::ApiKey.eq(user_key))
.filter(user_keys::Column::Active.eq(true))
.into_values::<_, QueryAs>()
.one(db)
.await
{
Ok::<Option<(i64, u32)>, _>(Some((_user_id, user_count_per_period))) => {
// user key is valid
if let Some(rate_limiter) = app.rate_limiter() {
// TODO: how does max burst actually work? what should it be?
let user_max_burst = user_count_per_period / 3;
let user_period = 60;
if rate_limiter
.throttle_key(
&user_key.to_string(),
Some(user_max_burst),
Some(user_count_per_period),
Some(user_period),
)
.await
.is_err()
{
// TODO: set headers so they know when they can retry
// warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
// TODO: use their id if possible
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
// TODO: include the user id (NOT THE API KEY!) here
anyhow::anyhow!("too many requests from this key"),
)
.await
.into_response());
}
} else {
// TODO: if no redis, rate limit with a local cache?
}
}
Ok(None) => {
// invalid user key
// TODO: rate limit by ip here, too? maybe tarpit?
return Err(handle_anyhow_error(
Some(StatusCode::FORBIDDEN),
None,
anyhow::anyhow!("unknown api key"),
)
.await
.into_response());
}
Err(err) => {
let err: anyhow::Error = err.into();
return Err(handle_anyhow_error(
Some(StatusCode::INTERNAL_SERVER_ERROR),
None,
err.context("failed checking database for user key"),
)
.await
.into_response());
}
}
Ok(())
}
pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
// build our application with a route
// order most to least common

@ -0,0 +1,146 @@
use axum::response::IntoResponse;
use entities::user_keys;
use redis_cell_client::ThrottleResult;
use reqwest::StatusCode;
use sea_orm::{
ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect,
};
use std::net::IpAddr;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::app::Web3ProxyApp;
use super::errors::handle_anyhow_error;
pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> {
let rate_limiter_key = format!("ip-{}", ip);
// TODO: dry this up with rate_limit_by_key
if let Some(rate_limiter) = app.rate_limiter() {
match rate_limiter
.throttle_key(&rate_limiter_key, None, None, None)
.await
{
Ok(ThrottleResult::Allowed) => {}
Ok(ThrottleResult::RetryAt(_retry_at)) => {
// TODO: set headers so they know when they can retry
debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good
// TODO: use their id if possible
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
anyhow::anyhow!(format!("too many requests from this ip: {}", ip)),
)
.await
.into_response());
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
return Err(handle_anyhow_error(
Some(StatusCode::INTERNAL_SERVER_ERROR),
None,
err,
)
.await
.into_response());
}
}
} else {
// TODO: if no redis, rate limit with a local cache?
warn!("no rate limiter!");
}
Ok(())
}
/// if Ok(()), rate limits are acceptable
/// if Err(response), rate limits exceeded
pub async fn rate_limit_by_key(
app: &Web3ProxyApp,
user_key: Uuid,
) -> Result<(), impl IntoResponse> {
let db = app.db_conn();
/// query just a few columns instead of the entire table
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
UserId,
RequestsPerMinute,
}
// query the db to make sure this key is active
// TODO: probably want a cache on this
match user_keys::Entity::find()
.select_only()
.column_as(user_keys::Column::UserId, QueryAs::UserId)
.column_as(
user_keys::Column::RequestsPerMinute,
QueryAs::RequestsPerMinute,
)
.filter(user_keys::Column::ApiKey.eq(user_key))
.filter(user_keys::Column::Active.eq(true))
.into_values::<_, QueryAs>()
.one(db)
.await
{
Ok::<Option<(i64, u32)>, _>(Some((_user_id, user_count_per_period))) => {
// user key is valid
if let Some(rate_limiter) = app.rate_limiter() {
// TODO: how does max burst actually work? what should it be?
let user_max_burst = user_count_per_period / 3;
let user_period = 60;
if rate_limiter
.throttle_key(
&user_key.to_string(),
Some(user_max_burst),
Some(user_count_per_period),
Some(user_period),
)
.await
.is_err()
{
// TODO: set headers so they know when they can retry
// warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
// TODO: use their id if possible
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
// TODO: include the user id (NOT THE API KEY!) here
anyhow::anyhow!("too many requests from this key"),
)
.await
.into_response());
}
} else {
// TODO: if no redis, rate limit with a local cache?
}
}
Ok(None) => {
// invalid user key
// TODO: rate limit by ip here, too? maybe tarpit?
return Err(handle_anyhow_error(
Some(StatusCode::FORBIDDEN),
None,
anyhow::anyhow!("unknown api key"),
)
.await
.into_response());
}
Err(err) => {
let err: anyhow::Error = err.into();
return Err(handle_anyhow_error(
Some(StatusCode::INTERNAL_SERVER_ERROR),
None,
err.context("failed checking database for user key"),
)
.await
.into_response());
}
}
Ok(())
}

@ -15,7 +15,7 @@ use sea_orm::ActiveModelTrait;
use serde::Deserialize;
use std::sync::Arc;
use crate::{app::Web3ProxyApp, frontend::rate_limit_by_ip};
use crate::{app::Web3ProxyApp, frontend::rate_limit::rate_limit_by_ip};
pub async fn create_user(
// this argument tells axum to parse the request body

@ -22,7 +22,7 @@ use crate::{
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
};
use super::{rate_limit_by_ip, rate_limit_by_key};
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key};
pub async fn public_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,