From 072c52050f29f3b0d6ef3591776ba102144de369 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 11 Oct 2023 01:42:55 -0700 Subject: [PATCH] add another minimum sleep and set missed tick behavior --- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/many.rs | 2 +- web3_proxy/src/rpcs/one.rs | 47 ++++++++----------- web3_proxy/src/stats/stat_buffer.rs | 4 ++ .../src/sub_commands/sentryd/mod.rs | 2 +- 5 files changed, 26 insertions(+), 31 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index c60a4c51..a15a60bb 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -872,7 +872,7 @@ impl RpcsForRequest { if self.request.connect_timeout() { break; } else { - yield_now().await; + // yield_now().await; } let mut earliest_retry_at = None; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a0870eeb..c4c5c52b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -409,7 +409,7 @@ impl Web3Rpcs { }; if next_try > web3_request.connect_timeout_at() { - let retry_in = Instant::now().duration_since(next_try).as_secs(); + let retry_in = Instant::now().duration_since(next_try).as_secs().min(1); // we don't use Web3ProxyError::RateLimited because that is for the user being rate limited return Err(Web3ProxyError::StatusCode( diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index e783b3c5..2e5b3802 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -816,13 +816,13 @@ impl Web3Rpc { select! { x = subscribe_stop_rx.changed() => { - if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + if *subscribe_stop_rx.borrow_and_update() || x.is_err() { trace!(%rpc, "stopping http block subscription"); break; } } _ = sleep(Duration::from_secs(health_sleep_seconds)) => { - // continue + // time for another health check } } } @@ -948,7 +948,7 @@ impl Web3Rpc { loop { select! { x = subscribe_stop_rx.changed() => { - if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + if *subscribe_stop_rx.borrow_and_update() || x.is_err() { break; } } @@ -1017,7 +1017,7 @@ impl Web3Rpc { loop { select! { x = subscribe_stop_rx.changed() => { - if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + if *subscribe_stop_rx.borrow_and_update() || x.is_err() { trace!(%self, "stopping websocket block subscription"); break; } @@ -1034,7 +1034,7 @@ impl Web3Rpc { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints // TODO: is 1/2 the block time okay? let mut i = interval(self.block_interval / 2); - i.set_missed_tick_behavior(MissedTickBehavior::Delay); + i.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { let block_result = self @@ -1052,7 +1052,7 @@ impl Web3Rpc { // TODO: should this select be at the start or end of the loop? select! { x = subscribe_stop_rx.changed() => { - if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + if *subscribe_stop_rx.borrow_and_update() || x.is_err() { trace!(%self, "stopping http block subscription"); break; } @@ -1102,7 +1102,7 @@ impl Web3Rpc { debug_assert!(wait > Duration::from_secs(0)); // TODO: have connect_timeout in addition to the full ttl - if retry_at > web3_request.expire_at() { + if retry_at > web3_request.connect_timeout_at() { // break now since we will wait past our maximum wait time return Err(Web3ProxyError::Timeout(Some( web3_request.start_instant.elapsed(), @@ -1114,32 +1114,14 @@ impl Web3Rpc { Ok(OpenRequestResult::Lagged(now_synced_f)) => { select! { _ = now_synced_f => {} - _ = sleep_until(web3_request.expire_at()) => { + _ = sleep_until(web3_request.connect_timeout_at()) => { break; } } } Ok(OpenRequestResult::Failed) => { - // TODO: when can this happen? log? emit a stat? + // TODO: when can this happen? log? emit a stat? is breaking the right thing to do? trace!("{} has no handle ready", self); - - // if head_block_sender.is_none() { - // head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe()); - // } - - // if let Some(head_block_sender) = &mut head_block_sender { - // select! { - // _ = head_block_sender.changed() => { - // head_block_sender.borrow_and_update(); - // } - // _ = sleep_until(web3_request.expire_at()) => { - // break; - // } - // } - // } else { - // break; - // } - break; } Err(err) => return Err(err), @@ -1159,7 +1141,16 @@ impl Web3Rpc { break; } - sleep_until(wait_until).await; + if retry_at < Instant::now() { + // TODO: think more about this + error!( + "retry_at is in the past {}s", + retry_at.elapsed().as_secs_f32() + ); + sleep(Duration::from_secs(1)).await; + } else { + sleep_until(retry_at).await; + } } RedisRateLimitResult::RetryNever => { // TODO: not sure what this should be diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 5d376fd3..be0d2e53 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -136,6 +136,10 @@ impl StatBuffer { let mut db_save_interval = interval(Duration::from_secs(self.db_save_interval_seconds as u64)); + // todo: what behavior? + db_save_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + tsdb_save_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // TODO: this should be a FlushedStats that we add to let mut total_requests = 0; let mut tsdb_frontend_requests = 0; diff --git a/web3_proxy_cli/src/sub_commands/sentryd/mod.rs b/web3_proxy_cli/src/sub_commands/sentryd/mod.rs index 66fbf09e..61a1afe2 100644 --- a/web3_proxy_cli/src/sub_commands/sentryd/mod.rs +++ b/web3_proxy_cli/src/sub_commands/sentryd/mod.rs @@ -276,7 +276,7 @@ where let mut interval = interval(Duration::from_secs(seconds)); // TODO: should we warn if there are delays? - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await;