diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 3cdbdf88..c0ff46fb 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -179,6 +179,7 @@ impl Web3ProxyApp { top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, + flush_stat_buffer_sender: flume::Sender>, flush_stat_buffer_receiver: flume::Receiver>, ) -> anyhow::Result { let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); @@ -390,6 +391,7 @@ impl Web3ProxyApp { Some(user_balance_cache.clone()), stat_buffer_shutdown_receiver, 1, + flush_stat_buffer_sender.clone(), flush_stat_buffer_receiver, )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 31d60f42..47288200 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -279,6 +279,7 @@ impl Web3RpcConfig { http_client: Option, blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, + max_head_block_age: Duration, tx_id_sender: Option>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { @@ -295,6 +296,7 @@ impl Web3RpcConfig { block_interval, blocks_by_hash_cache, block_sender, + max_head_block_age, tx_id_sender, ) .await diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 63e0f297..43f45334 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -221,6 +221,7 @@ impl Web3Rpcs { http_client, blocks_by_hash_cache, block_sender, + self.max_head_block_age, pending_tx_id_sender, )); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 9f746a9b..9e4e60e9 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -71,6 +71,8 @@ pub struct Web3Rpc { pub(super) internal_requests: AtomicUsize, /// Track total external requests served pub(super) external_requests: AtomicUsize, + /// If the head block is too old, it is ignored. + pub(super) max_head_block_age: Duration, /// Track time used by external requests served /// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set. pub(super) median_latency: Option, @@ -97,6 +99,7 @@ impl Web3Rpc { block_interval: Duration, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, + max_head_block_age: Duration, tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); @@ -193,6 +196,7 @@ impl Web3Rpc { hard_limit_until: Some(hard_limit_until), head_block: Some(head_block), http_provider, + max_head_block_age, name, peak_latency: Some(peak_latency), median_latency: Some(median_request_latency), @@ -544,9 +548,12 @@ impl Web3Rpc { let head_block = self.head_block.as_ref().unwrap().borrow().clone(); if let Some(head_block) = head_block { - let head_block = head_block.block; - // TODO: if head block is very old and not expected to be syncing, emit warning + if head_block.age() > self.max_head_block_age { + return Err(anyhow::anyhow!("head_block is too old!").into()); + } + + let head_block = head_block.block; let block_number = head_block.number.context("no block number")?; diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 84093d77..866d468b 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -49,9 +49,11 @@ pub struct StatBuffer { influxdb_client: Option, opt_in_timeseries_buffer: HashMap, rpc_secret_key_cache: RpcSecretKeyCache, - user_balance_cache: UserBalanceCache, timestamp_precision: TimestampPrecision, tsdb_save_interval_seconds: u32, + user_balance_cache: UserBalanceCache, + + _flush_sender: flume::Sender>, } impl StatBuffer { @@ -67,6 +69,7 @@ impl StatBuffer { user_balance_cache: Option, shutdown_receiver: broadcast::Receiver<()>, tsdb_save_interval_seconds: u32, + flush_sender: flume::Sender>, flush_receiver: flume::Receiver>, ) -> anyhow::Result> { if influxdb_bucket.is_none() { @@ -92,9 +95,10 @@ impl StatBuffer { influxdb_client, opt_in_timeseries_buffer: Default::default(), rpc_secret_key_cache: rpc_secret_key_cache.unwrap(), - user_balance_cache: user_balance_cache.unwrap(), timestamp_precision, tsdb_save_interval_seconds, + user_balance_cache: user_balance_cache.unwrap(), + _flush_sender: flush_sender, }; // any errors inside this task will cause the application to exit diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index ce0ae3fe..8440696b 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -72,7 +72,7 @@ impl MigrateStatsToV2SubCommand { None => None, }; - let (_flush_sender, flush_receiver) = flume::bounded(1); + let (flush_sender, flush_receiver) = flume::bounded(1); // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( @@ -86,6 +86,7 @@ impl MigrateStatsToV2SubCommand { None, rpc_account_shutdown_recevier, 1, + flush_sender, flush_receiver, ) .context("Error spawning stat buffer")? diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index 272c7d9c..e55c4576 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -14,7 +14,7 @@ use std::{fs, thread}; use tokio::select; use tokio::sync::{broadcast, oneshot}; use tokio::time::{sleep_until, Instant}; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; /// start the main proxy daemon #[derive(FromArgs, PartialEq, Debug, Eq)] @@ -42,7 +42,7 @@ impl ProxydSubCommand { let frontend_port = Arc::new(self.port.into()); let prometheus_port = Arc::new(self.prometheus_port.into()); - let (_flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1); + let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(8); Self::_main( top_config, @@ -51,12 +51,14 @@ impl ProxydSubCommand { prometheus_port, num_workers, frontend_shutdown_sender, + flush_stat_buffer_sender, flush_stat_buffer_receiver, ) .await } /// this shouldn't really be pub except it makes test fixtures easier + #[allow(clippy::too_many_arguments)] pub async fn _main( top_config: TopConfig, top_config_path: Option, @@ -64,6 +66,7 @@ impl ProxydSubCommand { prometheus_port: Arc, num_workers: usize, frontend_shutdown_sender: broadcast::Sender<()>, + flush_stat_buffer_sender: flume::Sender>, flush_stat_buffer_receiver: flume::Receiver>, ) -> anyhow::Result<()> { // tokio has code for catching ctrl+c so we use that to shut down in most cases @@ -85,6 +88,7 @@ impl ProxydSubCommand { top_config.clone(), num_workers, app_shutdown_sender.clone(), + flush_stat_buffer_sender, flush_stat_buffer_receiver, ) .await?; @@ -102,6 +106,9 @@ impl ProxydSubCommand { Ok(new_top_config) => match toml::from_str::(&new_top_config) { Ok(new_top_config) => { if new_top_config != current_config { + trace!("current_config: {:#?}", current_config); + trace!("new_top_config: {:#?}", new_top_config); + // TODO: print the differences // TODO: first run seems to always see differences. why? info!("config @ {:?} changed", top_config_path); diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index e04188f9..1e75c030 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -290,6 +290,7 @@ impl TestApp { prometheus_port_arc, num_workers, shutdown_sender.clone(), + flush_stat_buffer_sender.clone(), flush_stat_buffer_receiver, )) };