more yields and less timeouts

This commit is contained in:
Bryan Stitt 2023-08-04 18:54:16 -07:00
parent aed98ac034
commit bc8040f61d
6 changed files with 25 additions and 29 deletions

View File

@ -52,7 +52,7 @@ use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::{JoinHandle, yield_now};
use tokio::task::{yield_now, JoinHandle};
use tokio::time::{sleep, timeout};
use tracing::{error, info, trace, warn, Level};
@ -1190,7 +1190,9 @@ impl Web3ProxyApp {
tries += 1;
if tries < max_tries {
// try again
continue
yield_now().await;
continue;
}
request_metadata
@ -1211,7 +1213,7 @@ impl Web3ProxyApp {
// there might be clones in the background, so this isn't a sure thing
let _ = request_metadata.try_send_arc_stat();
return (code, response, rpcs)
return (code, response, rpcs);
}
}
@ -1431,7 +1433,6 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
Some(Duration::from_secs(30)),
// TODO: should this be block 0 instead?
Some(&U64::one()),
@ -1705,9 +1706,6 @@ impl Web3ProxyApp {
}
};
// TODO: different timeouts for different user tiers. get the duration out of the request_metadata
let backend_request_timetout = Duration::from_secs(240);
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block_num().copied();
let to_block_num = cache_key.to_block_num().copied();
@ -1721,20 +1719,17 @@ impl Web3ProxyApp {
self
.jsonrpc_response_cache
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
// TODO: think more about this timeout and test that it works well!
let response_data = timeout(
backend_request_timetout + Duration::from_millis(100),
self.balanced_rpcs
// TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata
let response_data = self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
Some(request_metadata),
Some(backend_request_timetout),
Some(Duration::from_secs(240)),
from_block_num.as_ref(),
to_block_num.as_ref(),
))
.await?;
)
.await;
if !cache_jsonrpc_errors && let Err(err) = response_data {
// if we are not supposed to cache jsonrpc errors,
@ -1758,20 +1753,16 @@ impl Web3ProxyApp {
}
}).await?
} else {
let x = timeout(
backend_request_timetout + Duration::from_millis(100),
self.balanced_rpcs
let x = self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
Some(request_metadata),
Some(backend_request_timetout),
Some(Duration::from_secs(240)),
None,
None,
)
)
.await??;
.await?;
x.into()
}

View File

@ -205,7 +205,8 @@ impl Web3Rpcs {
num: parent_num,
hash: *block.parent_hash(),
};
loop {
// TODO: smarter max loop on this
for _ in 0..16 {
let ancestor_number_to_hash_entry = self
.blocks_by_number
.entry_by_ref(&ancestor.num)

View File

@ -355,12 +355,12 @@ impl Web3Rpcs {
if futures.is_empty() {
// no transaction or block subscriptions.
// TODO: i don't like this. it's a hack to keep the tokio task alive
let handle = tokio::task::Builder::default()
.name("noop")
.spawn(async move {
loop {
sleep(Duration::from_secs(600)).await;
// TODO: "every interval, do a health check or disconnect the rpc"
}
})?;
@ -619,12 +619,13 @@ impl Web3Rpcs {
},
_ = sleep_until(start + max_wait) => break,
}
yield_now().await;
} else {
trace!("no potential rpcs and set to not wait");
break;
}
yield_now().await;
// clear for the next loop
potential_rpcs.clear();
}

View File

@ -27,6 +27,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
use tokio::task::yield_now;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, trace, warn, Level};
use url::Url;
@ -673,6 +674,8 @@ impl Web3Rpc {
break;
}
yield_now().await;
disconnect_watch_rx.changed().await?;
}
trace!("disconnect triggered on {}", rpc);
@ -896,6 +899,8 @@ impl Web3Rpc {
}
sleep_until(retry_at).await;
yield_now().await;
}
Ok(OpenRequestResult::NotReady) => {
// TODO: when can this happen? log? emit a stat?

View File

@ -12,7 +12,6 @@ use migration::sea_orm::prelude::Decimal;
use std::time::Duration;
use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::task::yield_now;
use tokio::time::{interval, sleep};
use tracing::{debug, error, info, trace, warn, Instrument};
@ -197,7 +196,6 @@ impl StatBuffer {
break;
}
}
yield_now().await;
}
// TODO: wait on all websockets to close

View File

@ -64,7 +64,7 @@ impl ProxydSubCommand {
/// this shouldn't really be pub except it makes test fixtures easier
#[allow(clippy::too_many_arguments)]
pub async fn _main(
mut top_config: TopConfig,
top_config: TopConfig,
top_config_path: Option<PathBuf>,
frontend_port: Arc<AtomicU16>,
prometheus_port: Arc<AtomicU16>,
@ -136,6 +136,7 @@ impl ProxydSubCommand {
}
// TODO: wait for SIGHUP instead?
// TODO: wait for file to change instead of polling?
thread::sleep(Duration::from_secs(10));
});
}
@ -171,7 +172,6 @@ impl ProxydSubCommand {
}
}
}
yield_now().await;
}
// start the frontend port