improve hard limits

rate limit per server or per endpoint and with any period
This commit is contained in:
Bryan Stitt 2023-08-03 11:54:50 -07:00
parent 690601643d
commit 0bc523ab9f
11 changed files with 79 additions and 41 deletions

View File

@ -59,9 +59,9 @@ Be sure to set `innodb_rollback_on_timeout=1`
## Influx ## Influx
If running multiple web3-proxies connected to the same influxdb bucket, you **must** set `app.influxdb_id` to a globally unique value for each server! If running multiple web3-proxies connected to the same influxdb bucket, you **must** set `app.unique_id` to a globally unique value for each server!
`app.influxdb_id` defaults to 0 which will only work if you only have one server! `app.unique_id` defaults to 0 which will only work if you only have one server!
## Common commands ## Common commands

View File

@ -168,7 +168,7 @@ impl Web3ProxyApp {
pub async fn spawn( pub async fn spawn(
frontend_port: Arc<AtomicU16>, frontend_port: Arc<AtomicU16>,
prometheus_port: Arc<AtomicU16>, prometheus_port: Arc<AtomicU16>,
top_config: TopConfig, mut top_config: TopConfig,
num_workers: usize, num_workers: usize,
shutdown_sender: broadcast::Sender<()>, shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>, flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
@ -178,6 +178,8 @@ impl Web3ProxyApp {
let mut config_watcher_shutdown_receiver = shutdown_sender.subscribe(); let mut config_watcher_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe();
top_config.clean();
let (new_top_config_sender, mut new_top_config_receiver) = let (new_top_config_sender, mut new_top_config_receiver) =
watch::channel(top_config.clone()); watch::channel(top_config.clone());
new_top_config_receiver.borrow_and_update(); new_top_config_receiver.borrow_and_update();
@ -196,20 +198,6 @@ impl Web3ProxyApp {
); );
} }
if !top_config.extra.is_empty() {
warn!(
extra=?top_config.extra.keys(),
"unknown TopConfig fields!",
);
}
if !top_config.app.extra.is_empty() {
warn!(
extra=?top_config.app.extra.keys(),
"unknown Web3ProxyAppConfig fields!",
);
}
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error // these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
// TODO: this is a small enough group, that a vec with try_join_all is probably fine // TODO: this is a small enough group, that a vec with try_join_all is probably fine
let app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> = FuturesUnordered::new(); let app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> = FuturesUnordered::new();
@ -338,7 +326,7 @@ impl Web3ProxyApp {
10, 10,
flush_stat_buffer_sender.clone(), flush_stat_buffer_sender.clone(),
flush_stat_buffer_receiver, flush_stat_buffer_receiver,
top_config.app.influxdb_id, top_config.app.unique_id,
)? { )? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting // since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle); important_background_handles.push(spawned_stat_buffer.background_handle);

View File

@ -1,4 +1,5 @@
use crate::app::Web3ProxyJoinHandle; use crate::app::Web3ProxyJoinHandle;
use crate::compute_units::default_usd_per_cu;
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc; use crate::rpcs::one::Web3Rpc;
use argh::FromArgs; use argh::FromArgs;
@ -52,6 +53,20 @@ pub struct TopConfig {
pub extra: HashMap<String, serde_json::Value>, pub extra: HashMap<String, serde_json::Value>,
} }
impl TopConfig {
/// TODO: this should probably be part of Deserialize
pub fn clean(&mut self) {
if !self.extra.is_empty() {
warn!(
extra=?self.extra.keys(),
"unknown TopConfig fields!",
);
}
self.app.clean();
}
}
/// shared configuration between Web3Rpcs /// shared configuration between Web3Rpcs
// TODO: no String, only &str // TODO: no String, only &str
#[serde_inline_default] #[serde_inline_default]
@ -192,15 +207,16 @@ pub struct AppConfig {
/// influxdb bucket to use for stats /// influxdb bucket to use for stats
pub influxdb_bucket: Option<String>, pub influxdb_bucket: Option<String>,
/// influxdb_id to use to keep stats from different servers being seen as duplicates of each other /// unique_id keeps stats from different servers being seen as duplicates of each other.
/// this int is used as part of the "nanoseconds" part of the influx timestamp. /// this int is used as part of the "nanoseconds" part of the influx timestamp.
/// it can also be used by the rate limiter.
/// ///
/// This **MUST** be set to a unique value for each running server. /// This **MUST** be set to a unique value for each running server.
/// If not set, severs will overwrite eachother's stats. /// If not set, severs will overwrite eachother's stats!
/// ///
/// <https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp> /// <https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp>
#[serde_inline_default(0i64)] #[serde_inline_default(0i64)]
pub influxdb_id: i64, pub unique_id: i64,
/// unknown config options get put here /// unknown config options get put here
#[serde(flatten, default = "HashMap::default")] #[serde(flatten, default = "HashMap::default")]
@ -213,6 +229,26 @@ impl Default for AppConfig {
} }
} }
impl AppConfig {
/// TODO: this should probably be part of Deserialize
fn clean(&mut self) {
if self.usd_per_cu.is_none() {
self.usd_per_cu = Some(default_usd_per_cu(self.chain_id));
}
if let Some(influxdb_id) = self.extra.get("influxdb_id") {
self.unique_id = influxdb_id.as_i64().unwrap();
}
if !self.extra.is_empty() {
warn!(
extra=?self.extra.keys(),
"unknown Web3ProxyAppConfig fields!",
);
}
}
}
/// TODO: we can't query a provider because we need this to create a provider /// TODO: we can't query a provider because we need this to create a provider
pub fn average_block_interval(chain_id: u64) -> Duration { pub fn average_block_interval(chain_id: u64) -> Duration {
match chain_id { match chain_id {
@ -258,8 +294,15 @@ pub struct Web3RpcConfig {
/// the requests per second at which the server starts slowing down /// the requests per second at which the server starts slowing down
#[serde_inline_default(1u32)] #[serde_inline_default(1u32)]
pub soft_limit: u32, pub soft_limit: u32,
/// the requests per second at which the server throws errors (rate limit or otherwise) /// the requests per period at which the server throws errors (rate limit or otherwise)
pub hard_limit: Option<u64>, pub hard_limit: Option<u64>,
/// the number of seconds in a rate limiting period
/// some providers allow burst limits and rolling windows, but coding that is a lot more complicated
#[serde_inline_default(1u32)]
pub hard_limit_period: u32,
/// if hard limits are applied per server or per endpoint. default is per server
#[serde(default = "Default::default")]
pub hard_limit_per_endpoint: bool,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub backup: bool, pub backup: bool,
@ -286,6 +329,7 @@ impl Web3RpcConfig {
self, self,
name: String, name: String,
redis_pool: Option<redis_rate_limiter::RedisPool>, redis_pool: Option<redis_rate_limiter::RedisPool>,
server_id: i64,
chain_id: u64, chain_id: u64,
block_interval: Duration, block_interval: Duration,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
@ -294,6 +338,7 @@ impl Web3RpcConfig {
max_head_block_age: Duration, max_head_block_age: Duration,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() { if !self.extra.is_empty() {
// TODO: move this to a `clean` function
warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!"); warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!");
} }
@ -303,6 +348,7 @@ impl Web3RpcConfig {
chain_id, chain_id,
http_client, http_client,
redis_pool, redis_pool,
server_id,
block_interval, block_interval,
blocks_by_hash_cache, blocks_by_hash_cache,
block_sender, block_sender,

View File

@ -170,6 +170,8 @@ impl Web3Rpcs {
let mut names_to_keep = vec![]; let mut names_to_keep = vec![];
let server_id = app.config.unique_id;
// turn configs into connections (in parallel) // turn configs into connections (in parallel)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs let mut spawn_handles: FuturesUnordered<_> = rpc_configs
.into_iter() .into_iter()
@ -197,6 +199,7 @@ impl Web3Rpcs {
let handle = tokio::spawn(server_config.spawn( let handle = tokio::spawn(server_config.spawn(
server_name, server_name,
vredis_pool, vredis_pool,
server_id,
chain_id, chain_id,
block_interval, block_interval,
http_client, http_client,

View File

@ -95,6 +95,7 @@ impl Web3Rpc {
// optional because this is only used for http providers. websocket-only providers don't use it // optional because this is only used for http providers. websocket-only providers don't use it
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_pool: Option<RedisPool>, redis_pool: Option<RedisPool>,
server_id: i64,
block_interval: Duration, block_interval: Duration,
block_map: BlocksByHashCache, block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>, block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
@ -105,12 +106,18 @@ impl Web3Rpc {
let hard_limit = match (config.hard_limit, redis_pool) { let hard_limit = match (config.hard_limit, redis_pool) {
(None, None) => None, (None, None) => None,
(Some(hard_limit), Some(redis_pool)) => { (Some(hard_limit), Some(redis_pool)) => {
// TODO: in process rate limiter instead? or is deffered good enough? let label = if config.hard_limit_per_endpoint {
format!("{}:{}:{}", chain_id, "endpoint", name)
} else {
format!("{}:{}:{}", chain_id, server_id, name)
};
// TODO: in process rate limiter instead? or maybe deferred? or is this good enough?
let rrl = RedisRateLimiter::new( let rrl = RedisRateLimiter::new(
"web3_proxy", "web3_proxy",
&format!("{}:{}", chain_id, name), &label,
hard_limit, hard_limit,
60.0, config.hard_limit_period as f32,
redis_pool, redis_pool,
); );

View File

@ -198,6 +198,8 @@ fn main() -> anyhow::Result<()> {
} }
} }
top_config.clean();
(Some(top_config), Some(top_config_path)) (Some(top_config), Some(top_config_path))
} else { } else {
(None, None) (None, None)

View File

@ -20,9 +20,10 @@ impl CheckConfigSubCommand {
info!("Loading config @ {}", self.path); info!("Loading config @ {}", self.path);
let top_config: String = fs::read_to_string(self.path)?; let top_config: String = fs::read_to_string(self.path)?;
let top_config: TopConfig = toml::from_str(&top_config)?; let mut top_config: TopConfig = toml::from_str(&top_config)?;
top_config.clean();
// TODO: pretty print
info!("config: {:#?}", top_config); info!("config: {:#?}", top_config);
if top_config.app.db_url.is_none() { if top_config.app.db_url.is_none() {

View File

@ -93,7 +93,7 @@ impl MigrateStatsToV2SubCommand {
60, 60,
flush_sender, flush_sender,
flush_receiver, flush_receiver,
top_config.app.influxdb_id, top_config.app.unique_id,
) )
.context("Error spawning stat buffer")? .context("Error spawning stat buffer")?
.context("No stat buffer spawned. Maybe missing influx or db credentials?")?; .context("No stat buffer spawned. Maybe missing influx or db credentials?")?;

View File

@ -5,7 +5,6 @@ use std::time::Duration;
use std::{fs, thread}; use std::{fs, thread};
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use web3_proxy::compute_units::default_usd_per_cu;
use web3_proxy::config::TopConfig; use web3_proxy::config::TopConfig;
use web3_proxy::globals::global_db_conn; use web3_proxy::globals::global_db_conn;
use web3_proxy::prelude::anyhow; use web3_proxy::prelude::anyhow;
@ -74,11 +73,6 @@ impl ProxydSubCommand {
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>, flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>, flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// TODO: this is gross but it works. i'd rather it be called by serde, but it needs to know the chain id
if top_config.app.usd_per_cu.is_none() {
top_config.app.usd_per_cu = Some(default_usd_per_cu(top_config.app.chain_id));
}
// tokio has code for catching ctrl+c so we use that to shut down in most cases // tokio has code for catching ctrl+c so we use that to shut down in most cases
// frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something // frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
@ -115,10 +109,7 @@ impl ProxydSubCommand {
match fs::read_to_string(&top_config_path) { match fs::read_to_string(&top_config_path) {
Ok(new_top_config) => match toml::from_str::<TopConfig>(&new_top_config) { Ok(new_top_config) => match toml::from_str::<TopConfig>(&new_top_config) {
Ok(mut new_top_config) => { Ok(mut new_top_config) => {
if new_top_config.app.usd_per_cu.is_none() { new_top_config.clean();
new_top_config.app.usd_per_cu =
Some(default_usd_per_cu(new_top_config.app.chain_id));
}
if new_top_config != current_config { if new_top_config != current_config {
trace!("current_config: {:#?}", current_config); trace!("current_config: {:#?}", current_config);

View File

@ -48,7 +48,7 @@ impl TestApp {
anvil: &TestAnvil, anvil: &TestAnvil,
db: Option<&TestMysql>, db: Option<&TestMysql>,
influx: Option<&TestInflux>, influx: Option<&TestInflux>,
influx_id: Option<u64>, unique_id: Option<u64>,
) -> Self { ) -> Self {
let chain_id = anvil.instance.chain_id(); let chain_id = anvil.instance.chain_id();
let num_workers = 4; let num_workers = 4;
@ -80,7 +80,7 @@ impl TestApp {
"influxdb_org": influx_org, "influxdb_org": influx_org,
"influxdb_token": influx_token, "influxdb_token": influx_token,
"influxdb_bucket": influx_bucket, "influxdb_bucket": influx_bucket,
"influxdb_id": influx_id.unwrap_or_default(), "unique_id": unique_id.unwrap_or_default(),
"default_user_max_requests_per_period": Some(6_000_000), "default_user_max_requests_per_period": Some(6_000_000),
"deposit_factory_contract": Address::from_str( "deposit_factory_contract": Address::from_str(
"4e3BC2054788De923A04936C6ADdB99A05B0Ea36", "4e3BC2054788De923A04936C6ADdB99A05B0Ea36",

View File

@ -137,7 +137,7 @@ async fn test_multiple_proxies_stats_add_up() {
// assert_eq!(flush_1_count_0.timeseries, 2); // assert_eq!(flush_1_count_0.timeseries, 2);
// give time for more stats to arrive // give time for more stats to arrive
sleep(Duration::from_secs(2)).await; sleep(Duration::from_secs(5)).await;
// no more stats should arrive // no more stats should arrive
let flush_0_count_1 = x_0.flush_stats().await.unwrap(); let flush_0_count_1 = x_0.flush_stats().await.unwrap();