diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 5f197201..c389e3eb 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -742,6 +742,9 @@ impl Web3Connection { let mut last_hash = H256::zero(); + // TODO: default to true? + let mut was_syncing = self.syncing(); + loop { // TODO: what should the max_wait be? match self @@ -761,6 +764,8 @@ impl Web3Connection { Ok(None) => { warn!("no head block on {}", self); + was_syncing = true; + self.send_head_block_result( Ok(None), &block_sender, @@ -784,6 +789,23 @@ impl Web3Connection { block_map.clone(), ) .await?; + + if was_syncing { + was_syncing = self.syncing(); + + if !was_syncing { + // we were syncing, but we aren't anymore + if let Err(err) = self + .check_block_data_limit(&authorization) + .await + { + warn!("unable to check block data limit after syncing ended. {:?}", err); + } + } + } else { + // TODO: it wasn't syncing, what if it is now? + was_syncing = true; + } } } Err(err) => { @@ -794,6 +816,8 @@ impl Web3Connection { block_map.clone(), ) .await?; + + was_syncing = true; } } } @@ -807,6 +831,8 @@ impl Web3Connection { ) .await?; + was_syncing = true; + // TODO: what should we do? sleep? extra time? } } @@ -860,25 +886,57 @@ impl Web3Connection { self.send_head_block_result(block, &block_sender, block_map.clone()) .await?; - while let Some(new_block) = stream.next().await { - // TODO: check the new block's hash to be sure we don't send dupes - let new_hash = new_block - .hash - .expect("blocks should always have a hash here"); + let mut was_syncing = self.syncing(); - if new_hash == last_hash { - // some rpcs like to give us duplicates. don't waste our time on them - continue; - } else { - last_hash = new_hash; + loop { + // TODO: timeout should be based on block time + // this timeout is here because + match timeout(Duration::from_secs(60), stream.next()).await { + Ok(Some(new_block)) => { + // TODO: check the new block's hash to be sure we don't send dupes + let new_hash = new_block + .hash + .expect("blocks should always have a hash here"); + + if new_hash == last_hash { + // some rpcs like to give us duplicates. don't waste our time on them + continue; + } else { + last_hash = new_hash; + } + + self.send_head_block_result( + Ok(Some(Arc::new(new_block))), + &block_sender, + block_map.clone(), + ) + .await?; + + if was_syncing { + was_syncing = self.syncing(); + + if !was_syncing { + // we were syncing, but we aren't anymore + if let Err(err) = + self.check_block_data_limit(&authorization).await + { + warn!("unable to check block data limit after syncing ended. {:?}", err); + break; + } + } + } else { + // TODO: it wasn't syncing, what if it is now? + } + } + Ok(None) => { + warn!("new_heads subscription to {} ended", self); + break; + } + Err(err) => { + warn!("{} timed out waiting for block! {:?}", self, err); + break; + } } - - self.send_head_block_result( - Ok(Some(Arc::new(new_block))), - &block_sender, - block_map.clone(), - ) - .await?; } // clear the head block. this might not be needed, but it won't hurt @@ -887,7 +945,6 @@ impl Web3Connection { // TODO: is this always an error? // TODO: we probably don't want a warn and to return error - warn!("new_heads subscription to {} ended", self); Err(anyhow::anyhow!("new_heads subscription ended")) } }