add another minimum sleep and set missed tick behavior

This commit is contained in:
Bryan Stitt 2023-10-11 01:42:55 -07:00
parent 15216b7d33
commit 072c52050f
5 changed files with 26 additions and 31 deletions

View File

@ -872,7 +872,7 @@ impl RpcsForRequest {
if self.request.connect_timeout() {
break;
} else {
yield_now().await;
// yield_now().await;
}
let mut earliest_retry_at = None;

View File

@ -409,7 +409,7 @@ impl Web3Rpcs {
};
if next_try > web3_request.connect_timeout_at() {
let retry_in = Instant::now().duration_since(next_try).as_secs();
let retry_in = Instant::now().duration_since(next_try).as_secs().min(1);
// we don't use Web3ProxyError::RateLimited because that is for the user being rate limited
return Err(Web3ProxyError::StatusCode(

View File

@ -816,13 +816,13 @@ impl Web3Rpc {
select! {
x = subscribe_stop_rx.changed() => {
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
trace!(%rpc, "stopping http block subscription");
break;
}
}
_ = sleep(Duration::from_secs(health_sleep_seconds)) => {
// continue
// time for another health check
}
}
}
@ -948,7 +948,7 @@ impl Web3Rpc {
loop {
select! {
x = subscribe_stop_rx.changed() => {
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
break;
}
}
@ -1017,7 +1017,7 @@ impl Web3Rpc {
loop {
select! {
x = subscribe_stop_rx.changed() => {
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
trace!(%self, "stopping websocket block subscription");
break;
}
@ -1034,7 +1034,7 @@ impl Web3Rpc {
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
// TODO: is 1/2 the block time okay?
let mut i = interval(self.block_interval / 2);
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
i.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
let block_result = self
@ -1052,7 +1052,7 @@ impl Web3Rpc {
// TODO: should this select be at the start or end of the loop?
select! {
x = subscribe_stop_rx.changed() => {
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
trace!(%self, "stopping http block subscription");
break;
}
@ -1102,7 +1102,7 @@ impl Web3Rpc {
debug_assert!(wait > Duration::from_secs(0));
// TODO: have connect_timeout in addition to the full ttl
if retry_at > web3_request.expire_at() {
if retry_at > web3_request.connect_timeout_at() {
// break now since we will wait past our maximum wait time
return Err(Web3ProxyError::Timeout(Some(
web3_request.start_instant.elapsed(),
@ -1114,32 +1114,14 @@ impl Web3Rpc {
Ok(OpenRequestResult::Lagged(now_synced_f)) => {
select! {
_ = now_synced_f => {}
_ = sleep_until(web3_request.expire_at()) => {
_ = sleep_until(web3_request.connect_timeout_at()) => {
break;
}
}
}
Ok(OpenRequestResult::Failed) => {
// TODO: when can this happen? log? emit a stat?
// TODO: when can this happen? log? emit a stat? is breaking the right thing to do?
trace!("{} has no handle ready", self);
// if head_block_sender.is_none() {
// head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe());
// }
// if let Some(head_block_sender) = &mut head_block_sender {
// select! {
// _ = head_block_sender.changed() => {
// head_block_sender.borrow_and_update();
// }
// _ = sleep_until(web3_request.expire_at()) => {
// break;
// }
// }
// } else {
// break;
// }
break;
}
Err(err) => return Err(err),
@ -1159,7 +1141,16 @@ impl Web3Rpc {
break;
}
sleep_until(wait_until).await;
if retry_at < Instant::now() {
// TODO: think more about this
error!(
"retry_at is in the past {}s",
retry_at.elapsed().as_secs_f32()
);
sleep(Duration::from_secs(1)).await;
} else {
sleep_until(retry_at).await;
}
}
RedisRateLimitResult::RetryNever => {
// TODO: not sure what this should be

View File

@ -136,6 +136,10 @@ impl StatBuffer {
let mut db_save_interval =
interval(Duration::from_secs(self.db_save_interval_seconds as u64));
// todo: what behavior?
db_save_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tsdb_save_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// TODO: this should be a FlushedStats that we add to
let mut total_requests = 0;
let mut tsdb_frontend_requests = 0;

View File

@ -276,7 +276,7 @@ where
let mut interval = interval(Duration::from_secs(seconds));
// TODO: should we warn if there are delays?
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;