diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 0e0b13fd..6828db52 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -222,9 +222,15 @@ impl Web3Rpcs { self.by_name.store(Arc::new(new_by_name)); if let Some(old_rpc) = old_rpc { - if old_rpc.head_block.as_ref().unwrap().borrow().is_some() { + if old_rpc + .head_block_sender + .as_ref() + .unwrap() + .borrow() + .is_some() + { let mut new_head_receiver = - rpc.head_block.as_ref().unwrap().subscribe(); + rpc.head_block_sender.as_ref().unwrap().subscribe(); debug!("waiting for new {} to sync", rpc); // TODO: maximum wait time or this could block things for too long @@ -1367,42 +1373,42 @@ mod tests { Web3Rpc { name: "a".to_string(), tier: 0.into(), - head_block: Some(tx_a), + head_block_sender: Some(tx_a), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), tier: 0.into(), - head_block: Some(tx_b), + head_block_sender: Some(tx_b), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), tier: 0.into(), - head_block: Some(tx_c), + head_block_sender: Some(tx_c), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), tier: 1.into(), - head_block: Some(tx_d), + head_block_sender: Some(tx_d), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), tier: 1.into(), - head_block: Some(tx_e), + head_block_sender: Some(tx_e), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), tier: 1.into(), - head_block: Some(tx_f), + head_block_sender: Some(tx_f), peak_latency: Some(new_peak_latency()), ..Default::default() }, @@ -1458,7 +1464,7 @@ mod tests { backup: false, block_data_limit: block_data_limit.into(), // tier: 0, - head_block: Some(tx_synced), + head_block_sender: Some(tx_synced), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1472,7 +1478,7 @@ mod tests { backup: false, block_data_limit: block_data_limit.into(), // tier: 0, - head_block: Some(tx_lagged), + head_block_sender: Some(tx_lagged), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1742,7 +1748,7 @@ mod tests { backup: false, block_data_limit: 64.into(), // tier: 1, - head_block: Some(tx_pruned), + head_block_sender: Some(tx_pruned), ..Default::default() }; @@ -1755,7 +1761,7 @@ mod tests { backup: false, block_data_limit: u64::MAX.into(), // tier: 2, - head_block: Some(tx_archive), + head_block_sender: Some(tx_archive), ..Default::default() }; @@ -1924,7 +1930,7 @@ mod tests { backup: false, block_data_limit: 64.into(), // tier: 0, - head_block: Some(tx_mock_geth), + head_block_sender: Some(tx_mock_geth), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1936,7 +1942,7 @@ mod tests { backup: false, block_data_limit: u64::MAX.into(), // tier: 1, - head_block: Some(tx_mock_erigon_archive), + head_block_sender: Some(tx_mock_erigon_archive), peak_latency: Some(new_peak_latency()), ..Default::default() }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 937bd82a..626003db 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -62,7 +62,7 @@ pub struct Web3Rpc { /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, /// head_block is only inside an Option so that the "Default" derive works. it will always be set. - pub(super) head_block: Option>>, + pub(super) head_block_sender: Option>>, /// Track head block latency pub(super) head_latency: RwLock, /// Track peak request latency @@ -197,7 +197,7 @@ impl Web3Rpc { display_name: config.display_name, hard_limit, hard_limit_until: Some(hard_limit_until), - head_block: Some(head_block), + head_block_sender: Some(head_block), http_provider, name, peak_latency: Some(peak_latency), @@ -234,7 +234,7 @@ impl Web3Rpc { /// TODO: should this return a struct that implements sorting traits? fn sort_on(&self, max_block: Option) -> (bool, u32, Reverse) { let mut head_block = self - .head_block + .head_block_sender .as_ref() .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) .unwrap_or_default(); @@ -380,7 +380,7 @@ impl Web3Rpc { /// TODO: get rid of this now that consensus rpcs does it pub fn has_block_data(&self, needed_block_num: &U64) -> bool { - let head_block_num = match self.head_block.as_ref().unwrap().borrow().as_ref() { + let head_block_num = match self.head_block_sender.as_ref().unwrap().borrow().as_ref() { None => return false, Some(x) => *x.number(), }; @@ -452,56 +452,56 @@ impl Web3Rpc { block_sender: &flume::Sender, block_map: &BlocksByHashCache, ) -> Web3ProxyResult<()> { + let head_block_sender = self.head_block_sender.as_ref().unwrap(); + let new_head_block = match new_head_block { - Ok(None) => { - let head_block_tx = self.head_block.as_ref().unwrap(); + Ok(x) => { + let x = x.and_then(Web3ProxyBlock::try_new); - if head_block_tx.borrow().is_none() { - // we previously sent a None. return early - return Ok(()); - } + match x { + None => { + if head_block_sender.borrow().is_none() { + // we previously sent a None. return early + return Ok(()); + } - let age = self.created_at.unwrap().elapsed().as_millis(); + let age = self.created_at.unwrap().elapsed().as_millis(); - debug!("clearing head block on {} ({}ms old)!", self, age); + debug!("clearing head block on {} ({}ms old)!", self, age); - head_block_tx.send_replace(None); + head_block_sender.send_replace(None); - None - } - Ok(Some(new_head_block)) => { - let new_head_block = Web3ProxyBlock::try_new(new_head_block) - .expect("blocks from newHeads subscriptions should also convert"); + None + } + Some(new_head_block) => { + let new_hash = *new_head_block.hash(); - let new_hash = *new_head_block.hash(); + // if we already have this block saved, set new_head_block to that arc. otherwise store this copy + let new_head_block = block_map + .get_with_by_ref(&new_hash, async move { new_head_block }) + .await; - // if we already have this block saved, set new_head_block to that arc. otherwise store this copy - let new_head_block = block_map - .get_with_by_ref(&new_hash, async move { new_head_block }) - .await; + // save the block so we don't send the same one multiple times + // also save so that archive checks can know how far back to query + head_block_sender.send_replace(Some(new_head_block.clone())); - // save the block so we don't send the same one multiple times - // also save so that archive checks can know how far back to query - self.head_block - .as_ref() - .unwrap() - .send_replace(Some(new_head_block.clone())); + if self.block_data_limit() == U64::zero() { + if let Err(err) = self.check_block_data_limit().await { + warn!( + "failed checking block limit after {} finished syncing. {:?}", + self, err + ); + } + } - if self.block_data_limit() == U64::zero() { - if let Err(err) = self.check_block_data_limit().await { - warn!( - "failed checking block limit after {} finished syncing. {:?}", - self, err - ); + Some(new_head_block) } } - - Some(new_head_block) } Err(err) => { warn!("unable to get block from {}. err={:?}", self, err); - self.head_block.as_ref().unwrap().send_replace(None); + head_block_sender.send_replace(None); None } @@ -524,7 +524,7 @@ impl Web3Rpc { self: &Arc, error_handler: Option, ) -> Web3ProxyResult<()> { - let head_block = self.head_block.as_ref().unwrap().borrow().clone(); + let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone(); if let Some(head_block) = head_block { let head_block = head_block.block; @@ -1110,7 +1110,7 @@ impl Serialize for Web3Rpc { // TODO: maybe this is too much data. serialize less? { - let head_block = self.head_block.as_ref().unwrap(); + let head_block = self.head_block_sender.as_ref().unwrap(); let head_block = head_block.borrow(); let head_block = head_block.as_ref(); state.serialize_field("head_block", &head_block)?; @@ -1192,7 +1192,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - head_block: Some(tx), + head_block_sender: Some(tx), ..Default::default() }; @@ -1226,7 +1226,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - head_block: Some(tx), + head_block_sender: Some(tx), ..Default::default() };