sometimes watch_for_block instead of returning an error

This commit is contained in:
Bryan Stitt 2023-04-10 22:28:31 -07:00
parent ed5ca0575c
commit b5ed0c4710

@ -1001,8 +1001,15 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: don't break. wait up to X seconds for a sever
break;
let waiting_for = min_block_needed.max(max_block_needed);
if watch_for_block(waiting_for, &mut watch_consensus_connections).await? {
// block found! continue so we can check for another rpc
continue;
} else {
// block won't be found without new servers being added
break;
}
}
}
}
@ -1015,6 +1022,7 @@ impl Web3Rpcs {
}
if let Some(r) = method_not_available_response {
// TODO: this error response is likely the user's fault. do we actually want it marked as an error? maybe keep user and server error bools?
// TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend
return Ok(r);
}
@ -1022,25 +1030,33 @@ impl Web3Rpcs {
let num_conns = self.by_name.read().len();
let num_skipped = skip_rpcs.len();
let needed = min_block_needed.max(max_block_needed);
let head_block_num = watch_consensus_connections
.borrow()
.as_ref()
.map(|x| *x.head_block.number());
// TODO: error? warn? debug? trace?
if num_skipped == 0 {
if head_block_num.is_none() {
error!(
"No servers synced (min {:?}, max {:?}, head {:?}) ({} known). None skipped",
"No servers synced (min {:?}, max {:?}, head {:?}) ({} known)",
min_block_needed, max_block_needed, head_block_num, num_conns
);
// TODO: remove this, or move to trace level
// debug!("{}", serde_json::to_string(&request).unwrap());
} else if head_block_num > needed.copied() {
// we have synced past the needed block
error!(
"No archive servers synced (min {:?}, max {:?}, head {:?}) ({} known)",
min_block_needed, max_block_needed, head_block_num, num_conns
);
} else if num_skipped == 0 {
} else {
error!(
"Requested data is not available (min {:?}, max {:?}, head {:?}) ({} skipped, {} known)",
min_block_needed, max_block_needed, head_block_num, num_skipped, num_conns
);
// TODO: remove this, or move to trace level
// debug!("{}", serde_json::to_string(&request).unwrap());
}
// TODO: what error code?
@ -1860,3 +1876,54 @@ mod tests {
)
}
}
/// returns `true` when the desired block number is available
/// TODO: max wait time? max number of blocks to wait for? time is probably best
async fn watch_for_block(
block_num: Option<&U64>,
watch_consensus_connections: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
) -> Web3ProxyResult<bool> {
let mut head_block_num = watch_consensus_connections
.borrow_and_update()
.as_ref()
.map(|x| *x.head_block.number());
match (block_num, head_block_num) {
(Some(x), Some(ref head)) => {
if x <= head {
// we are past this block and no servers have this block
// this happens if the block is old and all archive servers are offline
// there is no chance we will get this block without adding an archive server to the config
return Ok(false);
}
}
(None, None) => {
// i don't think this is possible
// maybe if we internally make a request for the latest block and all our servers are disconnected?
warn!("how'd this None/None happen?");
return Ok(false);
}
(Some(_), None) => {
// block requested but no servers synced. we will wait
}
(None, Some(head)) => {
// i don't think this is possible
// maybe if we internally make a request for the latest block and all our servers are disconnected?
warn!("how'd this None/Some({}) happen?", head);
return Ok(false);
}
};
// future block is requested
// wait for the block to arrive
while head_block_num < block_num.copied() {
watch_consensus_connections.changed().await?;
head_block_num = watch_consensus_connections
.borrow_and_update()
.as_ref()
.map(|x| *x.head_block.number());
}
Ok(true)
}