try yielding around all selects

This commit is contained in:
Bryan Stitt 2023-08-04 18:37:46 -07:00
parent 5257329559
commit aed98ac034
5 changed files with 18 additions and 1 deletions

@ -52,7 +52,7 @@ use std::sync::{atomic, Arc};
use std::time::Duration; use std::time::Duration;
use tokio::select; use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::{JoinHandle, yield_now};
use tokio::time::{sleep, timeout}; use tokio::time::{sleep, timeout};
use tracing::{error, info, trace, warn, Level}; use tracing::{error, info, trace, warn, Level};
@ -540,6 +540,7 @@ impl Web3ProxyApp {
_ = new_top_config_receiver.changed() => {} _ = new_top_config_receiver.changed() => {}
} }
} }
yield_now().await;
} }
Ok(()) Ok(())

@ -36,6 +36,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock}; use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock};
use tokio::task::yield_now;
use tracing::trace; use tracing::trace;
/// How to select backend servers for a request /// How to select backend servers for a request
@ -549,6 +550,8 @@ async fn read_web3_socket(
}; };
tokio::spawn(f); tokio::spawn(f);
yield_now().await;
} else { } else {
break; break;
} }

@ -31,6 +31,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tokio::task::yield_now;
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
@ -618,6 +619,7 @@ impl Web3Rpcs {
}, },
_ = sleep_until(start + max_wait) => break, _ = sleep_until(start + max_wait) => break,
} }
yield_now().await;
} else { } else {
trace!("no potential rpcs and set to not wait"); trace!("no potential rpcs and set to not wait");
break; break;
@ -985,9 +987,12 @@ impl Web3Rpcs {
skip_rpcs.pop(); skip_rpcs.pop();
} }
_ = watch_consensus_rpcs.changed() => { _ = watch_consensus_rpcs.changed() => {
// we don't actually care what they are now. we just care that they changed
watch_consensus_rpcs.borrow_and_update(); watch_consensus_rpcs.borrow_and_update();
} }
} }
yield_now().await;
} }
OpenRequestResult::NotReady => { OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata { if let Some(request_metadata) = request_metadata {
@ -1174,6 +1179,9 @@ impl Web3Rpcs {
_ = watch_consensus_rpcs.changed() => { _ = watch_consensus_rpcs.changed() => {
// consensus rpcs changed! // consensus rpcs changed!
watch_consensus_rpcs.borrow_and_update(); watch_consensus_rpcs.borrow_and_update();
yield_now().await;
// continue to try again // continue to try again
continue; continue;
} }
@ -1215,6 +1223,8 @@ impl Web3Rpcs {
} }
} }
yield_now().await;
continue; continue;
} else { } else {
warn!(?self, "all rate limits exceeded"); warn!(?self, "all rate limits exceeded");

@ -12,6 +12,7 @@ use migration::sea_orm::prelude::Decimal;
use std::time::Duration; use std::time::Duration;
use tokio::select; use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::task::yield_now;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
use tracing::{debug, error, info, trace, warn, Instrument}; use tracing::{debug, error, info, trace, warn, Instrument};
@ -196,6 +197,7 @@ impl StatBuffer {
break; break;
} }
} }
yield_now().await;
} }
// TODO: wait on all websockets to close // TODO: wait on all websockets to close

@ -171,6 +171,7 @@ impl ProxydSubCommand {
} }
} }
} }
yield_now().await;
} }
// start the frontend port // start the frontend port