diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index f33741dc..24c2e724 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -40,7 +40,7 @@ pub struct Web3Connection { /// Lower weight are higher priority when sending requests pub(super) weight: u32, // TODO: async lock? - pub(super) head_block_id: RwLock, + pub(super) head_block_id: RwLock>, } impl Web3Connection { @@ -162,23 +162,27 @@ impl Web3Connection { let mut limit = None; for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { - let mut head_block_num = self.head_block_id.read().num; + let mut head_block_id = self.head_block_id.read().clone(); // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though - while head_block_num == U64::zero() { + while head_block_id.is_none() { warn!(rpc=%self, "no head block yet. retrying"); // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? sleep(Duration::from_secs(1)).await; - head_block_num = self.head_block_id.read().num; + head_block_id = self.head_block_id.read().clone(); } + let head_block_num = head_block_id.expect("is_none was checked above").num; + + debug_assert_ne!(head_block_num, U64::zero()); // TODO: subtract 1 from block_data_limit for safety? let maybe_archive_block = head_block_num .saturating_sub((block_data_limit).into()) .max(U64::one()); + // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! let archive_result: Result = self .wait_for_request_handle() .await? @@ -216,7 +220,12 @@ impl Web3Connection { pub fn has_block_data(&self, needed_block_num: &U64) -> bool { let block_data_limit: U64 = self.block_data_limit(); - let newest_block_num = self.head_block_id.read().num; + let head_block_id = self.head_block_id.read().clone(); + + let newest_block_num = match head_block_id { + None => return false, + Some(x) => x.num, + }; let oldest_block_num = newest_block_num .saturating_sub(block_data_limit) @@ -322,10 +331,20 @@ impl Web3Connection { // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query { - let mut head_block = self.head_block_id.write(); + let mut head_block_id = self.head_block_id.write(); - head_block.hash = new_hash; - head_block.num = new_num; + if head_block_id.is_none() { + *head_block_id = Some(BlockId { + hash: new_hash, + num: new_num, + }); + } else { + head_block_id.as_mut().map(|x| { + x.hash = new_hash; + x.num = new_num; + x + }); + } } block_sender