move future block check outside the wait future
This commit is contained in:
parent
ac30e0b197
commit
20a0b99f3a
|
@ -72,10 +72,10 @@ pub enum Web3ProxyError {
|
||||||
EthersHttpClient(ethers::providers::HttpClientError),
|
EthersHttpClient(ethers::providers::HttpClientError),
|
||||||
EthersProvider(ethers::prelude::ProviderError),
|
EthersProvider(ethers::prelude::ProviderError),
|
||||||
EthersWsClient(ethers::prelude::WsClientError),
|
EthersWsClient(ethers::prelude::WsClientError),
|
||||||
#[display(fmt = "{} < {}", head, requested)]
|
#[display(fmt = "{:?} < {}", head, requested)]
|
||||||
#[from(ignore)]
|
#[from(ignore)]
|
||||||
FarFutureBlock {
|
FarFutureBlock {
|
||||||
head: U64,
|
head: Option<U64>,
|
||||||
requested: U64,
|
requested: U64,
|
||||||
},
|
},
|
||||||
GasEstimateNotU256,
|
GasEstimateNotU256,
|
||||||
|
|
|
@ -932,7 +932,7 @@ impl Web3Rpc {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribe to new blocks.
|
/// Subscribe to new block headers.
|
||||||
async fn subscribe_new_heads(
|
async fn subscribe_new_heads(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
|
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
|
||||||
|
@ -940,7 +940,6 @@ impl Web3Rpc {
|
||||||
) -> Web3ProxyResult<()> {
|
) -> Web3ProxyResult<()> {
|
||||||
trace!("subscribing to new heads on {}", self);
|
trace!("subscribing to new heads on {}", self);
|
||||||
|
|
||||||
// TODO: different handler depending on backup or not
|
|
||||||
let error_handler = if self.backup {
|
let error_handler = if self.backup {
|
||||||
Some(Level::DEBUG.into())
|
Some(Level::DEBUG.into())
|
||||||
} else {
|
} else {
|
||||||
|
@ -955,9 +954,7 @@ impl Web3Rpc {
|
||||||
|
|
||||||
// query the block once since the subscription doesn't send the current block
|
// query the block once since the subscription doesn't send the current block
|
||||||
// there is a very small race condition here where the stream could send us a new block right now
|
// there is a very small race condition here where the stream could send us a new block right now
|
||||||
// but all seeing the same block twice won't break anything
|
// but sending the same block twice won't break anything
|
||||||
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
|
||||||
// TODO: send this request to the ws_provider instead of the http_provider
|
|
||||||
let latest_block: Result<Option<ArcBlock>, _> = self
|
let latest_block: Result<Option<ArcBlock>, _> = self
|
||||||
.internal_request(
|
.internal_request(
|
||||||
"eth_getBlockByNumber".into(),
|
"eth_getBlockByNumber".into(),
|
||||||
|
@ -979,7 +976,6 @@ impl Web3Rpc {
|
||||||
} else if self.http_client.is_some() {
|
} else if self.http_client.is_some() {
|
||||||
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
||||||
// TODO: is 1/2 the block time okay?
|
// TODO: is 1/2 the block time okay?
|
||||||
// TODO: tokio-console shows this as 1ms, but it should definitely be more than that. block_interval definitely isn't 2ms
|
|
||||||
let mut i = interval(self.block_interval / 2);
|
let mut i = interval(self.block_interval / 2);
|
||||||
i.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
i.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
@ -1075,7 +1071,6 @@ impl Web3Rpc {
|
||||||
match self.try_throttle().await? {
|
match self.try_throttle().await? {
|
||||||
RedisRateLimitResult::Allowed(y) => return Ok(y),
|
RedisRateLimitResult::Allowed(y) => return Ok(y),
|
||||||
RedisRateLimitResult::RetryAt(retry_at, _) => {
|
RedisRateLimitResult::RetryAt(retry_at, _) => {
|
||||||
// TODO: check timeouts
|
|
||||||
if wait_until < retry_at {
|
if wait_until < retry_at {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1086,7 +1081,7 @@ impl Web3Rpc {
|
||||||
"retry_at is in the past {}s",
|
"retry_at is in the past {}s",
|
||||||
retry_at.elapsed().as_secs_f32()
|
retry_at.elapsed().as_secs_f32()
|
||||||
);
|
);
|
||||||
sleep(Duration::from_secs(1)).await;
|
sleep(Duration::from_millis(200)).await;
|
||||||
} else {
|
} else {
|
||||||
sleep_until(retry_at).await;
|
sleep_until(retry_at).await;
|
||||||
}
|
}
|
||||||
|
@ -1162,7 +1157,6 @@ impl Web3Rpc {
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure this block has the oldest block that this request needs
|
// make sure this block has the oldest block that this request needs
|
||||||
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
|
|
||||||
if let Some(block_needed) = web3_request.min_block_needed() {
|
if let Some(block_needed) = web3_request.min_block_needed() {
|
||||||
if !self.has_block_data(block_needed) {
|
if !self.has_block_data(block_needed) {
|
||||||
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self);
|
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self);
|
||||||
|
@ -1171,51 +1165,72 @@ impl Web3Rpc {
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure this block has the newest block that this request needs
|
// make sure this block has the newest block that this request needs
|
||||||
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
|
|
||||||
if let Some(block_needed) = web3_request.max_block_needed() {
|
if let Some(block_needed) = web3_request.max_block_needed() {
|
||||||
if !self.has_block_data(block_needed) {
|
if !self.has_block_data(block_needed) {
|
||||||
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self);
|
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self);
|
||||||
|
|
||||||
let clone = self.clone();
|
let rpc = self.clone();
|
||||||
let connect_timeout_at = web3_request.connect_timeout_at();
|
let connect_timeout_at = web3_request.connect_timeout_at();
|
||||||
|
|
||||||
|
let mut head_block_receiver =
|
||||||
|
self.head_block_sender.as_ref().unwrap().subscribe();
|
||||||
|
|
||||||
|
// if head_block is far behind block_needed, return now
|
||||||
|
// TODO: future block limit from the config
|
||||||
|
if let Some(head_block) = head_block_receiver.borrow_and_update().as_ref() {
|
||||||
|
let head_block_number = head_block.number();
|
||||||
|
|
||||||
|
if head_block_number + U64::from(5) < block_needed {
|
||||||
|
return Err(Web3ProxyError::FarFutureBlock {
|
||||||
|
head: Some(head_block_number),
|
||||||
|
requested: block_needed,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Ok(OpenRequestResult::Failed);
|
||||||
|
}
|
||||||
|
|
||||||
// create a future that resolves once this rpc can serve this request
|
// create a future that resolves once this rpc can serve this request
|
||||||
// TODO: i don't love this future. think about it more
|
// TODO: i don't love this future. think about it more
|
||||||
let synced_f = async move {
|
let synced_f = async move {
|
||||||
let mut head_block_receiver =
|
|
||||||
clone.head_block_sender.as_ref().unwrap().subscribe();
|
|
||||||
|
|
||||||
// TODO: if head_block is far behind block_needed, retrurn now
|
|
||||||
head_block_receiver.borrow_and_update();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
_ = head_block_receiver.changed() => {
|
_ = head_block_receiver.changed() => {
|
||||||
if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) {
|
let head_block = head_block_receiver.borrow_and_update();
|
||||||
|
|
||||||
|
if let Some(head_block_number) = head_block.as_ref().map(|x| x.number()) {
|
||||||
if head_block_number >= block_needed {
|
if head_block_number >= block_needed {
|
||||||
trace!("the block we needed has arrived!");
|
trace!("the block we needed has arrived!");
|
||||||
break;
|
return Ok(rpc);
|
||||||
}
|
|
||||||
// wait up to 2 blocks
|
|
||||||
// TODO: configurable wait per chain
|
|
||||||
if head_block_number + U64::from(2) < block_needed {
|
|
||||||
return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed });
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
|
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
|
||||||
// yield_now().await;
|
error!("no head block during try_request_handle on {}", rpc);
|
||||||
error!("no head block during try_request_handle on {}", clone);
|
break;
|
||||||
return Err(Web3ProxyError::NoServersSynced);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = sleep_until(connect_timeout_at) => {
|
_ = sleep_until(connect_timeout_at) => {
|
||||||
error!("connection timeout on {}", clone);
|
error!("connection timeout on {}", rpc);
|
||||||
return Err(Web3ProxyError::NoServersSynced);
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(clone)
|
if let Some(head_block_number) = head_block_receiver
|
||||||
|
.borrow_and_update()
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| x.number())
|
||||||
|
{
|
||||||
|
Err(Web3ProxyError::FarFutureBlock {
|
||||||
|
head: Some(head_block_number),
|
||||||
|
requested: block_needed,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(Web3ProxyError::FarFutureBlock {
|
||||||
|
head: None,
|
||||||
|
requested: block_needed,
|
||||||
|
})
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));
|
return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));
|
||||||
|
|
Loading…
Reference in New Issue