diff --git a/README.md b/README.md index 44f64011..4dfda9d8 100644 --- a/README.md +++ b/README.md @@ -59,9 +59,9 @@ Be sure to set `innodb_rollback_on_timeout=1` ## Influx -If running multiple web3-proxies connected to the same influxdb bucket, you **must** set `app.influxdb_id` to a globally unique value for each server! +If running multiple web3-proxies connected to the same influxdb bucket, you **must** set `app.unique_id` to a globally unique value for each server! -`app.influxdb_id` defaults to 0 which will only work if you only have one server! +`app.unique_id` defaults to 0 which will only work if you only have one server! ## Common commands diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 0c029579..e66bb77a 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -168,7 +168,7 @@ impl Web3ProxyApp { pub async fn spawn( frontend_port: Arc, prometheus_port: Arc, - top_config: TopConfig, + mut top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, flush_stat_buffer_sender: mpsc::Sender>, @@ -178,6 +178,8 @@ impl Web3ProxyApp { let mut config_watcher_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); + top_config.clean(); + let (new_top_config_sender, mut new_top_config_receiver) = watch::channel(top_config.clone()); new_top_config_receiver.borrow_and_update(); @@ -196,20 +198,6 @@ impl Web3ProxyApp { ); } - if !top_config.extra.is_empty() { - warn!( - extra=?top_config.extra.keys(), - "unknown TopConfig fields!", - ); - } - - if !top_config.app.extra.is_empty() { - warn!( - extra=?top_config.app.extra.keys(), - "unknown Web3ProxyAppConfig fields!", - ); - } - // these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error // TODO: this is a small enough group, that a vec with try_join_all is probably fine let app_handles: FuturesUnordered> = FuturesUnordered::new(); @@ -338,7 +326,7 @@ impl Web3ProxyApp { 10, flush_stat_buffer_sender.clone(), flush_stat_buffer_receiver, - top_config.app.influxdb_id, + top_config.app.unique_id, )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting important_background_handles.push(spawned_stat_buffer.background_handle); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 4dfa83ad..944835f1 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,4 +1,5 @@ use crate::app::Web3ProxyJoinHandle; +use crate::compute_units::default_usd_per_cu; use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; use argh::FromArgs; @@ -52,6 +53,20 @@ pub struct TopConfig { pub extra: HashMap, } +impl TopConfig { + /// TODO: this should probably be part of Deserialize + pub fn clean(&mut self) { + if !self.extra.is_empty() { + warn!( + extra=?self.extra.keys(), + "unknown TopConfig fields!", + ); + } + + self.app.clean(); + } +} + /// shared configuration between Web3Rpcs // TODO: no String, only &str #[serde_inline_default] @@ -192,15 +207,16 @@ pub struct AppConfig { /// influxdb bucket to use for stats pub influxdb_bucket: Option, - /// influxdb_id to use to keep stats from different servers being seen as duplicates of each other + /// unique_id keeps stats from different servers being seen as duplicates of each other. /// this int is used as part of the "nanoseconds" part of the influx timestamp. + /// it can also be used by the rate limiter. /// /// This **MUST** be set to a unique value for each running server. - /// If not set, severs will overwrite eachother's stats. + /// If not set, severs will overwrite eachother's stats! /// /// #[serde_inline_default(0i64)] - pub influxdb_id: i64, + pub unique_id: i64, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] @@ -213,6 +229,26 @@ impl Default for AppConfig { } } +impl AppConfig { + /// TODO: this should probably be part of Deserialize + fn clean(&mut self) { + if self.usd_per_cu.is_none() { + self.usd_per_cu = Some(default_usd_per_cu(self.chain_id)); + } + + if let Some(influxdb_id) = self.extra.get("influxdb_id") { + self.unique_id = influxdb_id.as_i64().unwrap(); + } + + if !self.extra.is_empty() { + warn!( + extra=?self.extra.keys(), + "unknown Web3ProxyAppConfig fields!", + ); + } + } +} + /// TODO: we can't query a provider because we need this to create a provider pub fn average_block_interval(chain_id: u64) -> Duration { match chain_id { @@ -258,8 +294,15 @@ pub struct Web3RpcConfig { /// the requests per second at which the server starts slowing down #[serde_inline_default(1u32)] pub soft_limit: u32, - /// the requests per second at which the server throws errors (rate limit or otherwise) + /// the requests per period at which the server throws errors (rate limit or otherwise) pub hard_limit: Option, + /// the number of seconds in a rate limiting period + /// some providers allow burst limits and rolling windows, but coding that is a lot more complicated + #[serde_inline_default(1u32)] + pub hard_limit_period: u32, + /// if hard limits are applied per server or per endpoint. default is per server + #[serde(default = "Default::default")] + pub hard_limit_per_endpoint: bool, /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs #[serde(default = "Default::default")] pub backup: bool, @@ -286,6 +329,7 @@ impl Web3RpcConfig { self, name: String, redis_pool: Option, + server_id: i64, chain_id: u64, block_interval: Duration, http_client: Option, @@ -294,6 +338,7 @@ impl Web3RpcConfig { max_head_block_age: Duration, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { + // TODO: move this to a `clean` function warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!"); } @@ -303,6 +348,7 @@ impl Web3RpcConfig { chain_id, http_client, redis_pool, + server_id, block_interval, blocks_by_hash_cache, block_sender, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 96e88c06..749f1ad0 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -170,6 +170,8 @@ impl Web3Rpcs { let mut names_to_keep = vec![]; + let server_id = app.config.unique_id; + // turn configs into connections (in parallel) let mut spawn_handles: FuturesUnordered<_> = rpc_configs .into_iter() @@ -197,6 +199,7 @@ impl Web3Rpcs { let handle = tokio::spawn(server_config.spawn( server_name, vredis_pool, + server_id, chain_id, block_interval, http_client, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 2622ec94..2366dc35 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -95,6 +95,7 @@ impl Web3Rpc { // optional because this is only used for http providers. websocket-only providers don't use it http_client: Option, redis_pool: Option, + server_id: i64, block_interval: Duration, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, @@ -105,12 +106,18 @@ impl Web3Rpc { let hard_limit = match (config.hard_limit, redis_pool) { (None, None) => None, (Some(hard_limit), Some(redis_pool)) => { - // TODO: in process rate limiter instead? or is deffered good enough? + let label = if config.hard_limit_per_endpoint { + format!("{}:{}:{}", chain_id, "endpoint", name) + } else { + format!("{}:{}:{}", chain_id, server_id, name) + }; + + // TODO: in process rate limiter instead? or maybe deferred? or is this good enough? let rrl = RedisRateLimiter::new( "web3_proxy", - &format!("{}:{}", chain_id, name), + &label, hard_limit, - 60.0, + config.hard_limit_period as f32, redis_pool, ); diff --git a/web3_proxy_cli/src/main.rs b/web3_proxy_cli/src/main.rs index eb3942c2..36174149 100644 --- a/web3_proxy_cli/src/main.rs +++ b/web3_proxy_cli/src/main.rs @@ -198,6 +198,8 @@ fn main() -> anyhow::Result<()> { } } + top_config.clean(); + (Some(top_config), Some(top_config_path)) } else { (None, None) diff --git a/web3_proxy_cli/src/sub_commands/check_config.rs b/web3_proxy_cli/src/sub_commands/check_config.rs index dc976a3b..ffe811a4 100644 --- a/web3_proxy_cli/src/sub_commands/check_config.rs +++ b/web3_proxy_cli/src/sub_commands/check_config.rs @@ -20,9 +20,10 @@ impl CheckConfigSubCommand { info!("Loading config @ {}", self.path); let top_config: String = fs::read_to_string(self.path)?; - let top_config: TopConfig = toml::from_str(&top_config)?; + let mut top_config: TopConfig = toml::from_str(&top_config)?; + + top_config.clean(); - // TODO: pretty print info!("config: {:#?}", top_config); if top_config.app.db_url.is_none() { diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index 75f708cc..8358f570 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -93,7 +93,7 @@ impl MigrateStatsToV2SubCommand { 60, flush_sender, flush_receiver, - top_config.app.influxdb_id, + top_config.app.unique_id, ) .context("Error spawning stat buffer")? .context("No stat buffer spawned. Maybe missing influx or db credentials?")?; diff --git a/web3_proxy_cli/src/sub_commands/proxyd.rs b/web3_proxy_cli/src/sub_commands/proxyd.rs index aef14eda..ee1cdb0a 100644 --- a/web3_proxy_cli/src/sub_commands/proxyd.rs +++ b/web3_proxy_cli/src/sub_commands/proxyd.rs @@ -5,7 +5,6 @@ use std::time::Duration; use std::{fs, thread}; use tracing::{error, info, trace, warn}; use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; -use web3_proxy::compute_units::default_usd_per_cu; use web3_proxy::config::TopConfig; use web3_proxy::globals::global_db_conn; use web3_proxy::prelude::anyhow; @@ -74,11 +73,6 @@ impl ProxydSubCommand { flush_stat_buffer_sender: mpsc::Sender>, flush_stat_buffer_receiver: mpsc::Receiver>, ) -> anyhow::Result<()> { - // TODO: this is gross but it works. i'd rather it be called by serde, but it needs to know the chain id - if top_config.app.usd_per_cu.is_none() { - top_config.app.usd_per_cu = Some(default_usd_per_cu(top_config.app.chain_id)); - } - // tokio has code for catching ctrl+c so we use that to shut down in most cases // frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` @@ -115,10 +109,7 @@ impl ProxydSubCommand { match fs::read_to_string(&top_config_path) { Ok(new_top_config) => match toml::from_str::(&new_top_config) { Ok(mut new_top_config) => { - if new_top_config.app.usd_per_cu.is_none() { - new_top_config.app.usd_per_cu = - Some(default_usd_per_cu(new_top_config.app.chain_id)); - } + new_top_config.clean(); if new_top_config != current_config { trace!("current_config: {:#?}", current_config); diff --git a/web3_proxy_cli/src/test_utils/app.rs b/web3_proxy_cli/src/test_utils/app.rs index 241fbc21..77d4b98f 100644 --- a/web3_proxy_cli/src/test_utils/app.rs +++ b/web3_proxy_cli/src/test_utils/app.rs @@ -48,7 +48,7 @@ impl TestApp { anvil: &TestAnvil, db: Option<&TestMysql>, influx: Option<&TestInflux>, - influx_id: Option, + unique_id: Option, ) -> Self { let chain_id = anvil.instance.chain_id(); let num_workers = 4; @@ -80,7 +80,7 @@ impl TestApp { "influxdb_org": influx_org, "influxdb_token": influx_token, "influxdb_bucket": influx_bucket, - "influxdb_id": influx_id.unwrap_or_default(), + "unique_id": unique_id.unwrap_or_default(), "default_user_max_requests_per_period": Some(6_000_000), "deposit_factory_contract": Address::from_str( "4e3BC2054788De923A04936C6ADdB99A05B0Ea36", diff --git a/web3_proxy_cli/tests/test_multiple_proxy.rs b/web3_proxy_cli/tests/test_multiple_proxy.rs index 3146bf13..1abdc3c7 100644 --- a/web3_proxy_cli/tests/test_multiple_proxy.rs +++ b/web3_proxy_cli/tests/test_multiple_proxy.rs @@ -137,7 +137,7 @@ async fn test_multiple_proxies_stats_add_up() { // assert_eq!(flush_1_count_0.timeseries, 2); // give time for more stats to arrive - sleep(Duration::from_secs(2)).await; + sleep(Duration::from_secs(5)).await; // no more stats should arrive let flush_0_count_1 = x_0.flush_stats().await.unwrap();