only check if we have block if we subscribe

This commit is contained in:
Bryan Stitt 2023-11-01 22:45:20 -07:00
parent 715e96119a
commit 53b4eacaa7

View File

@ -1167,84 +1167,86 @@ impl Web3Rpc {
return Ok(OpenRequestResult::Failed); return Ok(OpenRequestResult::Failed);
} }
// make sure this block has the oldest block that this request needs if self.block_and_rpc_sender.is_some() {
if let Some(block_needed) = web3_request.min_block_needed() { // make sure this rpc has the oldest block that this request needs
if !self.has_block_data(block_needed) { if let Some(block_needed) = web3_request.min_block_needed() {
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self); if !self.has_block_data(block_needed) {
return Ok(OpenRequestResult::Failed); trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self);
}
}
// make sure this block has the newest block that this request needs
if let Some(block_needed) = web3_request.max_block_needed() {
if !self.has_block_data(block_needed) {
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self);
let rpc = self.clone();
let connect_timeout_at = web3_request.connect_timeout_at();
let mut head_block_receiver =
self.head_block_sender.as_ref().unwrap().subscribe();
// if head_block is far behind block_needed, return now
// TODO: future block limit from the config
if let Some(head_block) = head_block_receiver.borrow_and_update().as_ref() {
let head_block_number = head_block.number();
if head_block_number + U64::from(5) < block_needed {
return Err(Web3ProxyError::FarFutureBlock {
head: Some(head_block_number),
requested: block_needed,
});
}
} else {
return Ok(OpenRequestResult::Failed); return Ok(OpenRequestResult::Failed);
} }
}
// create a future that resolves once this rpc can serve this request // make sure this rpc has the newest block that this request needs
// TODO: i don't love this future. think about it more if let Some(block_needed) = web3_request.max_block_needed() {
let synced_f = async move { if !self.has_block_data(block_needed) {
loop { trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self);
select! {
_ = head_block_receiver.changed() => {
let head_block = head_block_receiver.borrow_and_update();
if let Some(head_block_number) = head_block.as_ref().map(|x| x.number()) { let rpc = self.clone();
if head_block_number >= block_needed { let connect_timeout_at = web3_request.connect_timeout_at();
trace!("the block we needed has arrived!");
return Ok(rpc); let mut head_block_receiver =
self.head_block_sender.as_ref().unwrap().subscribe();
// if head_block is far behind block_needed, return now
// TODO: future block limit from the config
if let Some(head_block) = head_block_receiver.borrow_and_update().as_ref() {
let head_block_number = head_block.number();
if head_block_number + U64::from(5) < block_needed {
return Err(Web3ProxyError::FarFutureBlock {
head: Some(head_block_number),
requested: block_needed,
});
}
} else {
return Ok(OpenRequestResult::Failed);
}
// create a future that resolves once this rpc can serve this request
// TODO: i don't love this future. think about it more
let synced_f = async move {
loop {
select! {
_ = head_block_receiver.changed() => {
let head_block = head_block_receiver.borrow_and_update();
if let Some(head_block_number) = head_block.as_ref().map(|x| x.number()) {
if head_block_number >= block_needed {
trace!("the block we needed has arrived!");
return Ok(rpc);
}
} else {
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
error!("no head block during try_request_handle on {}", rpc);
break;
} }
} else { }
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best _ = sleep_until(connect_timeout_at) => {
error!("no head block during try_request_handle on {}", rpc); error!("connection timeout on {}", rpc);
break; break;
} }
} }
_ = sleep_until(connect_timeout_at) => {
error!("connection timeout on {}", rpc);
break;
}
} }
}
if let Some(head_block_number) = head_block_receiver if let Some(head_block_number) = head_block_receiver
.borrow_and_update() .borrow_and_update()
.as_ref() .as_ref()
.map(|x| x.number()) .map(|x| x.number())
{ {
Err(Web3ProxyError::FarFutureBlock { Err(Web3ProxyError::FarFutureBlock {
head: Some(head_block_number), head: Some(head_block_number),
requested: block_needed, requested: block_needed,
}) })
} else { } else {
Err(Web3ProxyError::FarFutureBlock { Err(Web3ProxyError::FarFutureBlock {
head: None, head: None,
requested: block_needed, requested: block_needed,
}) })
} }
}; };
return Ok(OpenRequestResult::Lagged(Box::pin(synced_f))); return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));
}
} }
} }
} }