From b5ed0c471052d13fa9b4f9cd3f5acfbfefc200ef Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 10 Apr 2023 22:28:31 -0700 Subject: [PATCH] sometimes watch_for_block instead of returning an error --- web3_proxy/src/rpcs/many.rs | 81 +++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 7 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index d461063c..5415d787 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1001,8 +1001,15 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - // TODO: don't break. wait up to X seconds for a sever - break; + let waiting_for = min_block_needed.max(max_block_needed); + + if watch_for_block(waiting_for, &mut watch_consensus_connections).await? { + // block found! continue so we can check for another rpc + continue; + } else { + // block won't be found without new servers being added + break; + } } } } @@ -1015,6 +1022,7 @@ impl Web3Rpcs { } if let Some(r) = method_not_available_response { + // TODO: this error response is likely the user's fault. do we actually want it marked as an error? maybe keep user and server error bools? // TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend return Ok(r); } @@ -1022,25 +1030,33 @@ impl Web3Rpcs { let num_conns = self.by_name.read().len(); let num_skipped = skip_rpcs.len(); + let needed = min_block_needed.max(max_block_needed); + let head_block_num = watch_consensus_connections .borrow() .as_ref() .map(|x| *x.head_block.number()); // TODO: error? warn? debug? trace? - if num_skipped == 0 { + if head_block_num.is_none() { error!( - "No servers synced (min {:?}, max {:?}, head {:?}) ({} known). None skipped", + "No servers synced (min {:?}, max {:?}, head {:?}) ({} known)", min_block_needed, max_block_needed, head_block_num, num_conns ); - - // TODO: remove this, or move to trace level - // debug!("{}", serde_json::to_string(&request).unwrap()); + } else if head_block_num > needed.copied() { + // we have synced past the needed block + error!( + "No archive servers synced (min {:?}, max {:?}, head {:?}) ({} known)", + min_block_needed, max_block_needed, head_block_num, num_conns + ); + } else if num_skipped == 0 { } else { error!( "Requested data is not available (min {:?}, max {:?}, head {:?}) ({} skipped, {} known)", min_block_needed, max_block_needed, head_block_num, num_skipped, num_conns ); + // TODO: remove this, or move to trace level + // debug!("{}", serde_json::to_string(&request).unwrap()); } // TODO: what error code? @@ -1860,3 +1876,54 @@ mod tests { ) } } + +/// returns `true` when the desired block number is available +/// TODO: max wait time? max number of blocks to wait for? time is probably best +async fn watch_for_block( + block_num: Option<&U64>, + watch_consensus_connections: &mut watch::Receiver>>, +) -> Web3ProxyResult { + let mut head_block_num = watch_consensus_connections + .borrow_and_update() + .as_ref() + .map(|x| *x.head_block.number()); + + match (block_num, head_block_num) { + (Some(x), Some(ref head)) => { + if x <= head { + // we are past this block and no servers have this block + // this happens if the block is old and all archive servers are offline + // there is no chance we will get this block without adding an archive server to the config + return Ok(false); + } + } + (None, None) => { + // i don't think this is possible + // maybe if we internally make a request for the latest block and all our servers are disconnected? + warn!("how'd this None/None happen?"); + return Ok(false); + } + (Some(_), None) => { + // block requested but no servers synced. we will wait + } + (None, Some(head)) => { + // i don't think this is possible + // maybe if we internally make a request for the latest block and all our servers are disconnected? + warn!("how'd this None/Some({}) happen?", head); + return Ok(false); + } + }; + + // future block is requested + // wait for the block to arrive + while head_block_num < block_num.copied() { + watch_consensus_connections.changed().await?; + + head_block_num = watch_consensus_connections + .borrow_and_update() + .as_ref() + .map(|x| *x.head_block.number()); + } + + Ok(true) +}