clean up some selects
This commit is contained in:
parent
e70eeb11e0
commit
8aac4779bf
|
@ -544,6 +544,7 @@ impl Web3ProxyApp {
|
||||||
_ = new_top_config_receiver.changed() => {}
|
_ = new_top_config_receiver.changed() => {}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// configs applied successfully. wait for configs to change or for the app to exit
|
||||||
select! {
|
select! {
|
||||||
_ = config_watcher_shutdown_receiver.recv() => {
|
_ = config_watcher_shutdown_receiver.recv() => {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -34,6 +34,7 @@ use std::net::IpAddr;
|
||||||
use std::str::from_utf8_mut;
|
use std::str::from_utf8_mut;
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock};
|
use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock};
|
||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
|
@ -465,7 +466,7 @@ async fn read_web3_socket(
|
||||||
let (close_sender, mut close_receiver) = broadcast::channel(1);
|
let (close_sender, mut close_receiver) = broadcast::channel(1);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
select! {
|
||||||
msg = ws_rx.next() => {
|
msg = ws_rx.next() => {
|
||||||
if let Some(Ok(msg)) = msg {
|
if let Some(Ok(msg)) = msg {
|
||||||
// clone things so we can handle multiple messages in parallel
|
// clone things so we can handle multiple messages in parallel
|
||||||
|
|
|
@ -1010,7 +1010,7 @@ impl Web3Rpcs {
|
||||||
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
|
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::select! {
|
select! {
|
||||||
_ = sleep_until(retry_at) => {
|
_ = sleep_until(retry_at) => {
|
||||||
trace!("slept!");
|
trace!("slept!");
|
||||||
skip_rpcs.pop();
|
skip_rpcs.pop();
|
||||||
|
@ -1191,7 +1191,7 @@ impl Web3Rpcs {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::select! {
|
select! {
|
||||||
_ = sleep_until(max_sleep) => {
|
_ = sleep_until(max_sleep) => {
|
||||||
// rpcs didn't change and we have waited too long. break to return an error
|
// rpcs didn't change and we have waited too long. break to return an error
|
||||||
warn!(?self, "timeout waiting for try_send_all_synced_connections!");
|
warn!(?self, "timeout waiting for try_send_all_synced_connections!");
|
||||||
|
@ -1223,11 +1223,21 @@ impl Web3Rpcs {
|
||||||
|
|
||||||
// TODO: only make one of these sleep_untils
|
// TODO: only make one of these sleep_untils
|
||||||
|
|
||||||
tokio::select! {
|
let break_at = start + max_wait;
|
||||||
_ = sleep_until(start + max_wait) => {break}
|
|
||||||
_ = sleep_until(retry_at) => {}
|
if break_at <= retry_at {
|
||||||
_ = watch_consensus_rpcs.changed() => {
|
select! {
|
||||||
watch_consensus_rpcs.borrow_and_update();
|
_ = sleep_until(break_at) => {break}
|
||||||
|
_ = watch_consensus_rpcs.changed() => {
|
||||||
|
watch_consensus_rpcs.borrow_and_update();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select! {
|
||||||
|
_ = sleep_until(retry_at) => {}
|
||||||
|
_ = watch_consensus_rpcs.changed() => {
|
||||||
|
watch_consensus_rpcs.borrow_and_update();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ use futures::stream;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use migration::sea_orm::prelude::Decimal;
|
use migration::sea_orm::prelude::Decimal;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
use tokio::time::{interval, sleep};
|
use tokio::time::{interval, sleep};
|
||||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||||
|
@ -140,7 +141,7 @@ impl StatBuffer {
|
||||||
let mut db_frontend_requests = 0;
|
let mut db_frontend_requests = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
select! {
|
||||||
stat = stat_receiver.recv() => {
|
stat = stat_receiver.recv() => {
|
||||||
if let Some(stat) = stat {
|
if let Some(stat) = stat {
|
||||||
total_frontend_requests += self._buffer_app_stat(stat).await?;
|
total_frontend_requests += self._buffer_app_stat(stat).await?;
|
||||||
|
|
|
@ -165,9 +165,7 @@ impl ProxydSubCommand {
|
||||||
return Err(anyhow::anyhow!("oh no! we never got a head block!"))
|
return Err(anyhow::anyhow!("oh no! we never got a head block!"))
|
||||||
}
|
}
|
||||||
_ = head_block_receiver.changed() => {
|
_ = head_block_receiver.changed() => {
|
||||||
if let Some(head_block) = spawned_app
|
if let Some(head_block) = head_block_receiver
|
||||||
.app
|
|
||||||
.head_block_receiver()
|
|
||||||
.borrow_and_update()
|
.borrow_and_update()
|
||||||
.as_ref()
|
.as_ref()
|
||||||
{
|
{
|
||||||
|
@ -195,7 +193,7 @@ impl ProxydSubCommand {
|
||||||
// if everything is working, these should all run forever
|
// if everything is working, these should all run forever
|
||||||
let mut exited_with_err = false;
|
let mut exited_with_err = false;
|
||||||
let mut frontend_exited = false;
|
let mut frontend_exited = false;
|
||||||
tokio::select! {
|
select! {
|
||||||
x = flatten_handles(spawned_app.app_handles) => {
|
x = flatten_handles(spawned_app.app_handles) => {
|
||||||
match x {
|
match x {
|
||||||
Ok(_) => info!("app_handle exited"),
|
Ok(_) => info!("app_handle exited"),
|
||||||
|
@ -245,8 +243,6 @@ impl ProxydSubCommand {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: This seems to have been removed on the main branch
|
|
||||||
// TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though
|
|
||||||
x = spawned_app.background_handles.next() => {
|
x = spawned_app.background_handles.next() => {
|
||||||
match x {
|
match x {
|
||||||
Some(Ok(_)) => info!("quiting from background handles"),
|
Some(Ok(_)) => info!("quiting from background handles"),
|
||||||
|
|
Loading…
Reference in New Issue