allow_unhealthy should also skip checking block heights

This commit is contained in:
Bryan Stitt 2023-10-13 20:26:02 -07:00
parent e11e6f6f27
commit ba7eba41db

View File

@ -737,9 +737,19 @@ impl Web3Rpc {
trace!("starting subscriptions on {}", self); trace!("starting subscriptions on {}", self);
self.check_provider(chain_id) // start optimistically
// we need healthy to start at "true" or else check_provider and other things will not send requests
// self.healthy.store(true, atomic::Ordering::Relaxed);
// TODO: this should set healthy to false. its not strictly necessary since block checks will fail, but this is a faster check
if let Err(err) = self
.check_provider(chain_id)
.await .await
.web3_context("failed check_provider")?; .web3_context("failed check_provider")
{
self.healthy.store(false, atomic::Ordering::Relaxed);
return Err(err);
}
let mut futures = Vec::new(); let mut futures = Vec::new();
let mut abort_handles = vec![]; let mut abort_handles = vec![];
@ -810,8 +820,6 @@ impl Web3Rpc {
tokio::spawn(f) tokio::spawn(f)
} else { } else {
self.healthy.store(true, atomic::Ordering::Relaxed);
let rpc = self.clone(); let rpc = self.clone();
let health_sleep_seconds = 60; let health_sleep_seconds = 60;
@ -1153,69 +1161,70 @@ impl Web3Rpc {
) -> Web3ProxyResult<OpenRequestResult> { ) -> Web3ProxyResult<OpenRequestResult> {
// TODO: if websocket is reconnecting, return an error? // TODO: if websocket is reconnecting, return an error?
// TODO: what if this is a health check?! if !allow_unhealthy {
if !(self.healthy.load(atomic::Ordering::Relaxed) || allow_unhealthy) { if !(self.healthy.load(atomic::Ordering::Relaxed)) {
return Ok(OpenRequestResult::Failed);
}
// make sure this block has the oldest block that this request needs
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.min_block_needed() {
if !self.has_block_data(block_needed) {
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self);
return Ok(OpenRequestResult::Failed); return Ok(OpenRequestResult::Failed);
} }
}
// make sure this block has the newest block that this request needs // make sure this block has the oldest block that this request needs
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.max_block_needed() { if let Some(block_needed) = web3_request.min_block_needed() {
if !self.has_block_data(block_needed) { if !self.has_block_data(block_needed) {
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self); trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self);
return Ok(OpenRequestResult::Failed);
}
}
let clone = self.clone(); // make sure this block has the newest block that this request needs
let connect_timeout_at = web3_request.connect_timeout_at(); // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.max_block_needed() {
if !self.has_block_data(block_needed) {
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self);
// create a future that resolves once this rpc can serve this request let clone = self.clone();
// TODO: i don't love this future. think about it more let connect_timeout_at = web3_request.connect_timeout_at();
let synced_f = async move {
let mut head_block_receiver =
clone.head_block_sender.as_ref().unwrap().subscribe();
// TODO: if head_block is far behind block_needed, retrurn now // create a future that resolves once this rpc can serve this request
head_block_receiver.borrow_and_update(); // TODO: i don't love this future. think about it more
let synced_f = async move {
let mut head_block_receiver =
clone.head_block_sender.as_ref().unwrap().subscribe();
loop { // TODO: if head_block is far behind block_needed, retrurn now
select! { head_block_receiver.borrow_and_update();
_ = head_block_receiver.changed() => {
if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) { loop {
if head_block_number >= block_needed { select! {
trace!("the block we needed has arrived!"); _ = head_block_receiver.changed() => {
break; if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) {
if head_block_number >= block_needed {
trace!("the block we needed has arrived!");
break;
}
// wait up to 2 blocks
// TODO: configurable wait per chain
if head_block_number + U64::from(2) < block_needed {
return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed });
}
} else {
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
// yield_now().await;
error!("no head block during try_request_handle on {}", clone);
return Err(Web3ProxyError::NoServersSynced);
} }
// wait up to 2 blocks }
// TODO: configurable wait per chain _ = sleep_until(connect_timeout_at) => {
if head_block_number + U64::from(2) < block_needed { error!("connection timeout on {}", clone);
return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed });
}
} else {
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
// yield_now().await;
error!("no head block during try_request_handle on {}", clone);
return Err(Web3ProxyError::NoServersSynced); return Err(Web3ProxyError::NoServersSynced);
} }
} }
_ = sleep_until(connect_timeout_at) => {
error!("connection timeout on {}", clone);
return Err(Web3ProxyError::NoServersSynced);
}
} }
}
Ok(clone) Ok(clone)
}; };
return Ok(OpenRequestResult::Lagged(Box::pin(synced_f))); return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));
}
} }
} }