improve rate limiting and request counters

This commit is contained in:
Bryan Stitt 2022-09-06 20:12:45 +00:00
parent 98265424bb
commit c34e8ef383
12 changed files with 93 additions and 50 deletions

View File

@ -7,7 +7,7 @@ min_sum_soft_limit = 2000
min_synced_rpcs = 2 min_synced_rpcs = 2
redis_url = "redis://dev-redis:6379/" redis_url = "redis://dev-redis:6379/"
# TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower # TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower
redis_max_connections = 99 redis_max_connections = 300
redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_public_url = "https://llamanodes.com/free-rpc-stats"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}"
public_rate_limit_per_minute = 0 public_rate_limit_per_minute = 0

View File

@ -1,14 +1,14 @@
pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync;
pub use bb8_redis::redis::RedisError; pub use bb8_redis::redis::RedisError;
use tracing::warn; use tracing::error;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RedisErrorSink; pub struct RedisErrorSink;
impl Bb8ErrorSync<RedisError> for RedisErrorSink { impl Bb8ErrorSync<RedisError> for RedisErrorSink {
fn sink(&self, err: RedisError) { fn sink(&self, err: RedisError) {
warn!(?err, "redis error"); error!(?err, "redis error");
} }
fn boxed_clone(&self) -> Box<dyn Bb8ErrorSync<RedisError>> { fn boxed_clone(&self) -> Box<dyn Bb8ErrorSync<RedisError>> {

View File

@ -5,7 +5,7 @@ use anyhow::Context;
use bb8_redis::redis::pipe; use bb8_redis::redis::pipe;
use std::ops::Add; use std::ops::Add;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::trace; use tracing::{debug, trace};
pub use crate::errors::{RedisError, RedisErrorSink}; pub use crate::errors::{RedisError, RedisErrorSink};
pub use bb8_redis::{bb8, redis, RedisConnectionManager}; pub use bb8_redis::{bb8, redis, RedisConnectionManager};
@ -72,12 +72,15 @@ impl RedisRateLimit {
let mut conn = self.pool.get().await?; let mut conn = self.pool.get().await?;
// TODO: at high concurency, i think this is giving errors
// TODO: i'm starting to think that bb8 has a bug
let x: Vec<u64> = pipe() let x: Vec<u64> = pipe()
// we could get the key first, but that means an extra redis call for every check. this seems better // we could get the key first, but that means an extra redis call for every check. this seems better
.incr(&throttle_key, count) .incr(&throttle_key, count)
// set expiration the first time we set the key. ignore the result // set expiration each time we set the key. ignore the result
.expire(&throttle_key, self.period as usize) .expire(&throttle_key, self.period as usize)
// .arg("NX") // TODO: this works in redis, but not elasticache // TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache
// .arg("NX")
.ignore() .ignore()
// do the query // do the query
.query_async(&mut *conn) .query_async(&mut *conn)
@ -91,12 +94,13 @@ impl RedisRateLimit {
let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period)); let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period));
trace!(%label, ?retry_at, "rate limited"); debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period);
return Ok(ThrottleResult::RetryAt(retry_at)); Ok(ThrottleResult::RetryAt(retry_at))
} else {
trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period);
Ok(ThrottleResult::Allowed)
} }
Ok(ThrottleResult::Allowed)
} }
#[inline] #[inline]

View File

@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant}; use tokio::time::{timeout, Instant};
use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; use tracing::{info, info_span, instrument, trace, warn, Instrument};
use uuid::Uuid; use uuid::Uuid;
// TODO: make this customizable? // TODO: make this customizable?
@ -78,8 +78,6 @@ pub struct Web3ProxyApp {
// TODO: broadcast channel instead? // TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<ArcBlock>, head_block_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>, pending_tx_sender: broadcast::Sender<TxStatus>,
/// TODO: this doesn't ever get incremented!
pub active_requests: AtomicUsize,
pub config: AppConfig, pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>, pub db_conn: Option<sea_orm::DatabaseConnection>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers /// store pending transactions that we've seen so that we don't send duplicates to subscribers
@ -265,6 +263,7 @@ impl Web3ProxyApp {
handles.push(balanced_handle); handles.push(balanced_handle);
let private_rpcs = if private_rpcs.is_empty() { let private_rpcs = if private_rpcs.is_empty() {
// TODO: do None instead of clone?
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
balanced_rpcs.clone() balanced_rpcs.clone()
} else { } else {
@ -310,7 +309,6 @@ impl Web3ProxyApp {
config: top_config.app, config: top_config.app,
balanced_rpcs, balanced_rpcs,
private_rpcs, private_rpcs,
active_requests: Default::default(),
response_cache, response_cache,
head_block_receiver, head_block_receiver,
pending_tx_sender, pending_tx_sender,
@ -527,8 +525,7 @@ impl Web3ProxyApp {
request: JsonRpcRequestEnum, request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> { ) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level // TODO: this should probably be trace level
// trace!(?request, "proxy_web3_rpc"); trace!(?request, "proxy_web3_rpc");
debug!(?request, "proxying request");
// even though we have timeouts on the requests to our backend providers, // even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever // we need a timeout for the incoming request so that retries don't run forever
@ -545,8 +542,7 @@ impl Web3ProxyApp {
}; };
// TODO: this should probably be trace level // TODO: this should probably be trace level
// trace!(?response, "Forwarding"); trace!(?response, "Forwarding");
debug!(?response.ids(), "forwarding response");
Ok(response) Ok(response)
} }

View File

@ -127,7 +127,7 @@ fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default // if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this? // TODO: is there a better way to do this?
if std::env::var("RUST_LOG").is_err() { if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); std::env::set_var("RUST_LOG", "info,redis_rate_limit=debug,web3_proxy=debug");
} }
// install global collector configured based on RUST_LOG env var. // install global collector configured based on RUST_LOG env var.

View File

@ -1,7 +1,7 @@
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use serde_json::json; use serde_json::json;
use std::sync::{atomic::Ordering, Arc}; use std::sync::Arc;
/// Health check page for load balancers to use /// Health check page for load balancers to use
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
@ -15,15 +15,13 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
/// Very basic status page /// Very basic status page
/// TODO: replace this with proper stats and monitoring /// TODO: replace this with proper stats and monitoring
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: what else should we include? uptime? prometheus? // TODO: what else should we include? uptime?
let body = json!({ let body = json!({
"balanced_rpcs": app.balanced_rpcs, "balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs, "private_rpcs": app.private_rpcs,
"num_active_requests": app.active_requests.load(Ordering::Acquire),
// TODO: include number of items?
"pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(), "pending_transactions_size": app.pending_transactions.weighted_size(),
}); });
(StatusCode::OK, Json(body)) Json(body)
} }

View File

@ -11,7 +11,7 @@ use sea_orm::{
}; };
use std::{net::IpAddr, time::Duration}; use std::{net::IpAddr, time::Duration};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::debug; use tracing::{debug, error};
use uuid::Uuid; use uuid::Uuid;
pub enum RateLimitResult { pub enum RateLimitResult {
@ -119,6 +119,7 @@ impl TryFrom<RateLimitResult> for RequestFrom {
impl Web3ProxyApp { impl Web3ProxyApp {
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key // TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors
if let Some(rate_limiter) = &self.rate_limiter { if let Some(rate_limiter) = &self.rate_limiter {
let rate_limiter_label = format!("ip-{}", ip); let rate_limiter_label = format!("ip-{}", ip);
@ -136,12 +137,13 @@ impl Web3ProxyApp {
} }
Ok(ThrottleResult::RetryNever) => { Ok(ThrottleResult::RetryNever) => {
// TODO: prettier error for the user // TODO: prettier error for the user
return Err(anyhow::anyhow!("blocked by rate limiter")); return Err(anyhow::anyhow!("ip blocked by rate limiter"));
} }
Err(err) => { Err(err) => {
// internal error, not rate limit being hit // internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place. // TODO: i really want axum to do this for us in a single place.
return Err(err); error!(?err, "redis is unhappy. allowing ip");
return Ok(RateLimitResult::AllowedIp(ip));
} }
} }
} else { } else {
@ -194,7 +196,7 @@ impl Web3ProxyApp {
} }
}; };
// save for the next run // save for the next run
self.user_cache.insert(user_key, user_data).await; self.user_cache.insert(user_key, user_data).await;
Ok(user_data) Ok(user_data)
@ -234,20 +236,39 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits // user key is valid. now check rate limits
if let Some(rate_limiter) = &self.rate_limiter { if let Some(rate_limiter) = &self.rate_limiter {
// TODO: query redis in the background so that users don't have to wait on this network request // TODO: query redis in the background so that users don't have to wait on this network request
if rate_limiter // TODO: better key? have a prefix so its easy to delete all of these
let rate_limiter_label = user_key.to_string();
match rate_limiter
.throttle_label( .throttle_label(
&user_key.to_string(), &rate_limiter_label,
Some(user_data.user_count_per_period), Some(user_data.user_count_per_period),
1, 1,
) )
.await .await
.is_err()
{ {
// TODO: set headers so they know when they can retry Ok(ThrottleResult::Allowed) => {}
// warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good Ok(ThrottleResult::RetryAt(_retry_at)) => {
// TODO: use their id if possible // TODO: set headers so they know when they can retry
// TODO: StatusCode::TOO_MANY_REQUESTS debug!(?rate_limiter_label, "user rate limit exceeded"); // this is too verbose, but a stat might be good
return Err(anyhow::anyhow!("too many requests from this key")); // TODO: use their id if possible
return Ok(RateLimitResult::UserRateLimitExceeded(user_data.user_id));
}
Ok(ThrottleResult::RetryNever) => {
// TODO: prettier error for the user
return Err(anyhow::anyhow!("user blocked by rate limiter"));
}
Err(err) => {
// internal error, not rate limit being hit
// rather than have downtime, i think its better to just use in-process rate limiting
// TODO: in-process rate limits that pipe into redis
error!(?err, "redis is unhappy. allowing ip");
return Ok(RateLimitResult::AllowedUser(user_data.user_id));
} // // TODO: set headers so they know when they can retry
// // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
// // TODO: use their id if possible
// // TODO: StatusCode::TOO_MANY_REQUESTS
// return Err(anyhow::anyhow!("too many requests from this key"));
} }
} else { } else {
// TODO: if no redis, rate limit with a local cache? // TODO: if no redis, rate limit with a local cache?

View File

@ -458,7 +458,7 @@ impl Web3Connections {
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
} else { } else {
// hash changed // hash changed
info!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, old=%old_block_id, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); info!(con_head=%heavy_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain"); // todo!("handle equal by updating the cannonical chain");
self.save_block(&heavy_block, true).await?; self.save_block(&heavy_block, true).await?;

View File

@ -28,6 +28,9 @@ pub struct Web3Connection {
url: String, url: String,
/// keep track of currently open requests. We sort on this /// keep track of currently open requests. We sort on this
pub(super) active_requests: AtomicU32, pub(super) active_requests: AtomicU32,
/// keep track of total requests
/// TODO: is this type okay?
pub(super) total_requests: AtomicU64,
/// provider is in a RwLock so that we can replace it if re-connecting /// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits /// it is an async lock because we hold it open across awaits
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>, pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
@ -35,7 +38,7 @@ pub struct Web3Connection {
hard_limit: Option<RedisRateLimit>, hard_limit: Option<RedisRateLimit>,
/// used for load balancing to the least loaded server /// used for load balancing to the least loaded server
pub(super) soft_limit: u32, pub(super) soft_limit: u32,
/// TODO: have an enum for this so that "no limit" prints pretty /// TODO: have an enum for this so that "no limit" prints pretty?
block_data_limit: AtomicU64, block_data_limit: AtomicU64,
/// Lower weight are higher priority when sending requests /// Lower weight are higher priority when sending requests
pub(super) weight: u32, pub(super) weight: u32,
@ -82,6 +85,7 @@ impl Web3Connection {
name, name,
url: url_str.clone(), url: url_str.clone(),
active_requests: 0.into(), active_requests: 0.into(),
total_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(provider))), provider: AsyncRwLock::new(Some(Arc::new(provider))),
hard_limit, hard_limit,
soft_limit, soft_limit,
@ -777,13 +781,18 @@ impl Serialize for Web3Connection {
S: Serializer, S: Serializer,
{ {
// 3 is the number of fields in the struct. // 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 5)?; let mut state = serializer.serialize_struct("Web3Connection", 6)?;
// the url is excluded because it likely includes private information. just show the name // the url is excluded because it likely includes private information. just show the name
state.serialize_field("name", &self.name)?; state.serialize_field("name", &self.name)?;
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
state.serialize_field("block_data_limit", &block_data_limit)?;
if block_data_limit == u64::MAX {
state.serialize_field("block_data_limit", "None")?;
} else {
state.serialize_field("block_data_limit", &block_data_limit)?;
}
state.serialize_field("soft_limit", &self.soft_limit)?; state.serialize_field("soft_limit", &self.soft_limit)?;
@ -792,6 +801,11 @@ impl Serialize for Web3Connection {
&self.active_requests.load(atomic::Ordering::Relaxed), &self.active_requests.load(atomic::Ordering::Relaxed),
)?; )?;
state.serialize_field(
"total_requests",
&self.total_requests.load(atomic::Ordering::Relaxed),
)?;
let head_block_id = &*self.head_block_id.read(); let head_block_id = &*self.head_block_id.read();
state.serialize_field("head_block_id", head_block_id)?; state.serialize_field("head_block_id", head_block_id)?;

View File

@ -56,7 +56,7 @@ impl Web3Connections {
chain_id: u64, chain_id: u64,
server_configs: HashMap<String, Web3ConnectionConfig>, server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_rate_limit::RedisPool>, redis_pool: Option<redis_rate_limit::RedisPool>,
block_map: BlockHashesMap, block_map: BlockHashesMap,
head_block_sender: Option<watch::Sender<ArcBlock>>, head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32, min_sum_soft_limit: u32,
@ -83,7 +83,7 @@ impl Web3Connections {
async move { async move {
loop { loop {
// TODO: every time a head_block arrives (maybe with a small delay), or on the interval. // TODO: every time a head_block arrives (with a small delay for known slow servers), or on the interval.
interval.tick().await; interval.tick().await;
trace!("http interval ready"); trace!("http interval ready");
@ -108,7 +108,7 @@ impl Web3Connections {
.into_iter() .into_iter()
.map(|(server_name, server_config)| { .map(|(server_name, server_config)| {
let http_client = http_client.clone(); let http_client = http_client.clone();
let redis_client_pool = redis_client_pool.clone(); let redis_pool = redis_pool.clone();
let http_interval_sender = http_interval_sender.clone(); let http_interval_sender = http_interval_sender.clone();
let block_sender = if head_block_sender.is_some() { let block_sender = if head_block_sender.is_some() {
@ -124,7 +124,7 @@ impl Web3Connections {
server_config server_config
.spawn( .spawn(
server_name, server_name,
redis_client_pool, redis_pool,
chain_id, chain_id,
http_client, http_client,
http_interval_sender, http_interval_sender,
@ -159,11 +159,16 @@ impl Web3Connections {
} }
} }
// TODO: less than 3? what should we do here? if connections.len() < min_synced_rpcs {
if connections.len() < 2 { return Err(anyhow::anyhow!(
warn!("Only {} connection(s)!", connections.len()); "Only {}/{} connections!",
connections.len(),
min_synced_rpcs
));
} }
// TODO: safety check on sum soft limit
let synced_connections = SyncedConnections::default(); let synced_connections = SyncedConnections::default();
// TODO: sizing and expiration on these caches! // TODO: sizing and expiration on these caches!

View File

@ -1,8 +1,7 @@
use std::time::Duration; use std::time::Duration;
use derive_more::From; use derive_more::From;
use ethers::providers::Middleware; use tracing::{info_span, instrument, Instrument};
use tracing::{error_span, info_span, instrument, Instrument};
/// Use HTTP and WS providers. /// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592> // TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>

View File

@ -22,12 +22,18 @@ pub struct OpenRequestHandle(Arc<Web3Connection>);
impl OpenRequestHandle { impl OpenRequestHandle {
pub fn new(connection: Arc<Web3Connection>) -> Self { pub fn new(connection: Arc<Web3Connection>) -> Self {
// TODO: attach a unique id to this? // TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?! // TODO: what ordering?!
connection connection
.active_requests .active_requests
.fetch_add(1, atomic::Ordering::AcqRel); .fetch_add(1, atomic::Ordering::AcqRel);
// TODO: handle overflows?
// TODO: what ordering?
connection
.total_requests
.fetch_add(1, atomic::Ordering::Relaxed);
Self(connection) Self(connection)
} }