diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index e66bb77a..9bf76dcb 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -52,7 +52,7 @@ use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; -use tokio::task::JoinHandle; +use tokio::task::{JoinHandle, yield_now}; use tokio::time::{sleep, timeout}; use tracing::{error, info, trace, warn, Level}; @@ -540,6 +540,7 @@ impl Web3ProxyApp { _ = new_top_config_receiver.changed() => {} } } + yield_now().await; } Ok(()) diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 659b0e9e..c72909e6 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -36,6 +36,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::select; use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock}; +use tokio::task::yield_now; use tracing::trace; /// How to select backend servers for a request @@ -549,6 +550,8 @@ async fn read_web3_socket( }; tokio::spawn(f); + + yield_now().await; } else { break; } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 749f1ad0..0b463fee 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -31,6 +31,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::select; use tokio::sync::{mpsc, watch}; +use tokio::task::yield_now; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -618,6 +619,7 @@ impl Web3Rpcs { }, _ = sleep_until(start + max_wait) => break, } + yield_now().await; } else { trace!("no potential rpcs and set to not wait"); break; @@ -985,9 +987,12 @@ impl Web3Rpcs { skip_rpcs.pop(); } _ = watch_consensus_rpcs.changed() => { + // we don't actually care what they are now. we just care that they changed watch_consensus_rpcs.borrow_and_update(); + } } + yield_now().await; } OpenRequestResult::NotReady => { if let Some(request_metadata) = request_metadata { @@ -1174,6 +1179,9 @@ impl Web3Rpcs { _ = watch_consensus_rpcs.changed() => { // consensus rpcs changed! watch_consensus_rpcs.borrow_and_update(); + + yield_now().await; + // continue to try again continue; } @@ -1215,6 +1223,8 @@ impl Web3Rpcs { } } + yield_now().await; + continue; } else { warn!(?self, "all rate limits exceeded"); diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 7127bfde..07229452 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -12,6 +12,7 @@ use migration::sea_orm::prelude::Decimal; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio::task::yield_now; use tokio::time::{interval, sleep}; use tracing::{debug, error, info, trace, warn, Instrument}; @@ -196,6 +197,7 @@ impl StatBuffer { break; } } + yield_now().await; } // TODO: wait on all websockets to close diff --git a/web3_proxy_cli/src/sub_commands/proxyd.rs b/web3_proxy_cli/src/sub_commands/proxyd.rs index ee1cdb0a..a82e8d02 100644 --- a/web3_proxy_cli/src/sub_commands/proxyd.rs +++ b/web3_proxy_cli/src/sub_commands/proxyd.rs @@ -171,6 +171,7 @@ impl ProxydSubCommand { } } } + yield_now().await; } // start the frontend port