From f5a1ac274a0a352a5b92968b47233cddf7597a96 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 13 Jun 2023 10:00:08 -0700 Subject: [PATCH] refactor send_head_block_result --- web3_proxy/src/app/mod.rs | 1 + .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 1 + web3_proxy/src/rpcs/many.rs | 34 ++++++++---------- web3_proxy/src/rpcs/one.rs | 35 +++++++++++-------- 4 files changed, 36 insertions(+), 35 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 41747e1a..a1df6c2a 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -388,6 +388,7 @@ impl Web3ProxyApp { top_config .app .influxdb_bucket + .as_ref() .expect("influxdb_bucket needed when influxdb_host is set"); let influxdb_client = diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 175aefdf..05ad957b 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -60,6 +60,7 @@ impl MigrateStatsToV2 { top_config .app .influxdb_bucket + .as_ref() .expect("influxdb_token needed when influxdb_host is set"); let influxdb_client = diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6828db52..0e0b13fd 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -222,15 +222,9 @@ impl Web3Rpcs { self.by_name.store(Arc::new(new_by_name)); if let Some(old_rpc) = old_rpc { - if old_rpc - .head_block_sender - .as_ref() - .unwrap() - .borrow() - .is_some() - { + if old_rpc.head_block.as_ref().unwrap().borrow().is_some() { let mut new_head_receiver = - rpc.head_block_sender.as_ref().unwrap().subscribe(); + rpc.head_block.as_ref().unwrap().subscribe(); debug!("waiting for new {} to sync", rpc); // TODO: maximum wait time or this could block things for too long @@ -1373,42 +1367,42 @@ mod tests { Web3Rpc { name: "a".to_string(), tier: 0.into(), - head_block_sender: Some(tx_a), + head_block: Some(tx_a), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), tier: 0.into(), - head_block_sender: Some(tx_b), + head_block: Some(tx_b), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), tier: 0.into(), - head_block_sender: Some(tx_c), + head_block: Some(tx_c), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), tier: 1.into(), - head_block_sender: Some(tx_d), + head_block: Some(tx_d), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), tier: 1.into(), - head_block_sender: Some(tx_e), + head_block: Some(tx_e), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), tier: 1.into(), - head_block_sender: Some(tx_f), + head_block: Some(tx_f), peak_latency: Some(new_peak_latency()), ..Default::default() }, @@ -1464,7 +1458,7 @@ mod tests { backup: false, block_data_limit: block_data_limit.into(), // tier: 0, - head_block_sender: Some(tx_synced), + head_block: Some(tx_synced), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1478,7 +1472,7 @@ mod tests { backup: false, block_data_limit: block_data_limit.into(), // tier: 0, - head_block_sender: Some(tx_lagged), + head_block: Some(tx_lagged), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1748,7 +1742,7 @@ mod tests { backup: false, block_data_limit: 64.into(), // tier: 1, - head_block_sender: Some(tx_pruned), + head_block: Some(tx_pruned), ..Default::default() }; @@ -1761,7 +1755,7 @@ mod tests { backup: false, block_data_limit: u64::MAX.into(), // tier: 2, - head_block_sender: Some(tx_archive), + head_block: Some(tx_archive), ..Default::default() }; @@ -1930,7 +1924,7 @@ mod tests { backup: false, block_data_limit: 64.into(), // tier: 0, - head_block_sender: Some(tx_mock_geth), + head_block: Some(tx_mock_geth), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1942,7 +1936,7 @@ mod tests { backup: false, block_data_limit: u64::MAX.into(), // tier: 1, - head_block_sender: Some(tx_mock_erigon_archive), + head_block: Some(tx_mock_erigon_archive), peak_latency: Some(new_peak_latency()), ..Default::default() }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 626003db..4af35499 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -62,7 +62,7 @@ pub struct Web3Rpc { /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, /// head_block is only inside an Option so that the "Default" derive works. it will always be set. - pub(super) head_block_sender: Option>>, + pub(super) head_block: Option>>, /// Track head block latency pub(super) head_latency: RwLock, /// Track peak request latency @@ -197,7 +197,7 @@ impl Web3Rpc { display_name: config.display_name, hard_limit, hard_limit_until: Some(hard_limit_until), - head_block_sender: Some(head_block), + head_block: Some(head_block), http_provider, name, peak_latency: Some(peak_latency), @@ -234,7 +234,7 @@ impl Web3Rpc { /// TODO: should this return a struct that implements sorting traits? fn sort_on(&self, max_block: Option) -> (bool, u32, Reverse) { let mut head_block = self - .head_block_sender + .head_block .as_ref() .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) .unwrap_or_default(); @@ -380,7 +380,7 @@ impl Web3Rpc { /// TODO: get rid of this now that consensus rpcs does it pub fn has_block_data(&self, needed_block_num: &U64) -> bool { - let head_block_num = match self.head_block_sender.as_ref().unwrap().borrow().as_ref() { + let head_block_num = match self.head_block.as_ref().unwrap().borrow().as_ref() { None => return false, Some(x) => *x.number(), }; @@ -449,10 +449,10 @@ impl Web3Rpc { pub(crate) async fn send_head_block_result( self: &Arc, new_head_block: Web3ProxyResult>, - block_sender: &flume::Sender, + block_and_rpc_sender: &flume::Sender, block_map: &BlocksByHashCache, ) -> Web3ProxyResult<()> { - let head_block_sender = self.head_block_sender.as_ref().unwrap(); + let head_block_sender = self.head_block.as_ref().unwrap(); let new_head_block = match new_head_block { Ok(x) => { @@ -469,8 +469,11 @@ impl Web3Rpc { debug!("clearing head block on {} ({}ms old)!", self, age); + // send an empty block to take this server out of rotation head_block_sender.send_replace(None); + // TODO: clear self.block_data_limit? + None } Some(new_head_block) => { @@ -481,8 +484,7 @@ impl Web3Rpc { .get_with_by_ref(&new_hash, async move { new_head_block }) .await; - // save the block so we don't send the same one multiple times - // also save so that archive checks can know how far back to query + // we are synced! yey! head_block_sender.send_replace(Some(new_head_block.clone())); if self.block_data_limit() == U64::zero() { @@ -501,17 +503,20 @@ impl Web3Rpc { Err(err) => { warn!("unable to get block from {}. err={:?}", self, err); + // send an empty block to take this server out of rotation head_block_sender.send_replace(None); + // TODO: clear self.block_data_limit? + None } }; - // send an empty block to take this server out of rotation - block_sender + // tell web3rpcs about this rpc having this block + block_and_rpc_sender .send_async((new_head_block, self.clone())) .await - .context("block_sender")?; + .context("block_and_rpc_sender failed sending")?; Ok(()) } @@ -524,7 +529,7 @@ impl Web3Rpc { self: &Arc, error_handler: Option, ) -> Web3ProxyResult<()> { - let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone(); + let head_block = self.head_block.as_ref().unwrap().borrow().clone(); if let Some(head_block) = head_block { let head_block = head_block.block; @@ -1110,7 +1115,7 @@ impl Serialize for Web3Rpc { // TODO: maybe this is too much data. serialize less? { - let head_block = self.head_block_sender.as_ref().unwrap(); + let head_block = self.head_block.as_ref().unwrap(); let head_block = head_block.borrow(); let head_block = head_block.as_ref(); state.serialize_field("head_block", &head_block)?; @@ -1192,7 +1197,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - head_block_sender: Some(tx), + head_block: Some(tx), ..Default::default() }; @@ -1226,7 +1231,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - head_block_sender: Some(tx), + head_block: Some(tx), ..Default::default() };