diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 298e4c1d..5f64eb19 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -544,6 +544,7 @@ impl Web3ProxyApp { _ = new_top_config_receiver.changed() => {} } } else { + // configs applied successfully. wait for configs to change or for the app to exit select! { _ = config_watcher_shutdown_receiver.recv() => { break; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 94643865..659b0e9e 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -34,6 +34,7 @@ use std::net::IpAddr; use std::str::from_utf8_mut; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use tokio::select; use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock}; use tracing::trace; @@ -465,7 +466,7 @@ async fn read_web3_socket( let (close_sender, mut close_receiver) = broadcast::channel(1); loop { - tokio::select! { + select! { msg = ws_rx.next() => { if let Some(Ok(msg)) = msg { // clone things so we can handle multiple messages in parallel diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index c3df64a9..bc9441d9 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1010,7 +1010,7 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } - tokio::select! { + select! { _ = sleep_until(retry_at) => { trace!("slept!"); skip_rpcs.pop(); @@ -1191,7 +1191,7 @@ impl Web3Rpcs { break; }; - tokio::select! { + select! { _ = sleep_until(max_sleep) => { // rpcs didn't change and we have waited too long. break to return an error warn!(?self, "timeout waiting for try_send_all_synced_connections!"); @@ -1223,11 +1223,21 @@ impl Web3Rpcs { // TODO: only make one of these sleep_untils - tokio::select! { - _ = sleep_until(start + max_wait) => {break} - _ = sleep_until(retry_at) => {} - _ = watch_consensus_rpcs.changed() => { - watch_consensus_rpcs.borrow_and_update(); + let break_at = start + max_wait; + + if break_at <= retry_at { + select! { + _ = sleep_until(break_at) => {break} + _ = watch_consensus_rpcs.changed() => { + watch_consensus_rpcs.borrow_and_update(); + } + } + } else { + select! { + _ = sleep_until(retry_at) => {} + _ = watch_consensus_rpcs.changed() => { + watch_consensus_rpcs.borrow_and_update(); + } } } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 4694f56a..7127bfde 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -10,6 +10,7 @@ use futures::stream; use hashbrown::HashMap; use migration::sea_orm::prelude::Decimal; use std::time::Duration; +use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{interval, sleep}; use tracing::{debug, error, info, trace, warn, Instrument}; @@ -140,7 +141,7 @@ impl StatBuffer { let mut db_frontend_requests = 0; loop { - tokio::select! { + select! { stat = stat_receiver.recv() => { if let Some(stat) = stat { total_frontend_requests += self._buffer_app_stat(stat).await?; diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index 39a1793c..d1b00b13 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -165,9 +165,7 @@ impl ProxydSubCommand { return Err(anyhow::anyhow!("oh no! we never got a head block!")) } _ = head_block_receiver.changed() => { - if let Some(head_block) = spawned_app - .app - .head_block_receiver() + if let Some(head_block) = head_block_receiver .borrow_and_update() .as_ref() { @@ -195,7 +193,7 @@ impl ProxydSubCommand { // if everything is working, these should all run forever let mut exited_with_err = false; let mut frontend_exited = false; - tokio::select! { + select! { x = flatten_handles(spawned_app.app_handles) => { match x { Ok(_) => info!("app_handle exited"), @@ -245,8 +243,6 @@ impl ProxydSubCommand { } } } - // TODO: This seems to have been removed on the main branch - // TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though x = spawned_app.background_handles.next() => { match x { Some(Ok(_)) => info!("quiting from background handles"),