From bc8040f61d19845868d48e095020d45b10119c45 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 4 Aug 2023 18:54:16 -0700 Subject: [PATCH] more yields and less timeouts --- web3_proxy/src/app/mod.rs | 35 +++++++++-------------- web3_proxy/src/rpcs/blockchain.rs | 3 +- web3_proxy/src/rpcs/many.rs | 5 ++-- web3_proxy/src/rpcs/one.rs | 5 ++++ web3_proxy/src/stats/stat_buffer.rs | 2 -- web3_proxy_cli/src/sub_commands/proxyd.rs | 4 +-- 6 files changed, 25 insertions(+), 29 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 9bf76dcb..9e124a09 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -52,7 +52,7 @@ use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; -use tokio::task::{JoinHandle, yield_now}; +use tokio::task::{yield_now, JoinHandle}; use tokio::time::{sleep, timeout}; use tracing::{error, info, trace, warn, Level}; @@ -1190,7 +1190,9 @@ impl Web3ProxyApp { tries += 1; if tries < max_tries { // try again - continue + yield_now().await; + + continue; } request_metadata @@ -1211,7 +1213,7 @@ impl Web3ProxyApp { // there might be clones in the background, so this isn't a sure thing let _ = request_metadata.try_send_arc_stat(); - return (code, response, rpcs) + return (code, response, rpcs); } } @@ -1431,7 +1433,6 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - Some(Duration::from_secs(30)), // TODO: should this be block 0 instead? Some(&U64::one()), @@ -1705,9 +1706,6 @@ impl Web3ProxyApp { } }; - // TODO: different timeouts for different user tiers. get the duration out of the request_metadata - let backend_request_timetout = Duration::from_secs(240); - if let Some(cache_key) = cache_key { let from_block_num = cache_key.from_block_num().copied(); let to_block_num = cache_key.to_block_num().copied(); @@ -1721,20 +1719,17 @@ impl Web3ProxyApp { self .jsonrpc_response_cache .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { - // TODO: think more about this timeout and test that it works well! - let response_data = timeout( - backend_request_timetout + Duration::from_millis(100), - self.balanced_rpcs + // TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata + let response_data = self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, params, Some(request_metadata), - - Some(backend_request_timetout), + Some(Duration::from_secs(240)), from_block_num.as_ref(), to_block_num.as_ref(), - )) - .await?; + ) + .await; if !cache_jsonrpc_errors && let Err(err) = response_data { // if we are not supposed to cache jsonrpc errors, @@ -1758,20 +1753,16 @@ impl Web3ProxyApp { } }).await? } else { - let x = timeout( - backend_request_timetout + Duration::from_millis(100), - self.balanced_rpcs + let x = self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, params, Some(request_metadata), - - Some(backend_request_timetout), + Some(Duration::from_secs(240)), None, None, ) - ) - .await??; + .await?; x.into() } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 7ad4a824..f12e8bda 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -205,7 +205,8 @@ impl Web3Rpcs { num: parent_num, hash: *block.parent_hash(), }; - loop { + // TODO: smarter max loop on this + for _ in 0..16 { let ancestor_number_to_hash_entry = self .blocks_by_number .entry_by_ref(&ancestor.num) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 0b463fee..123c3384 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -355,12 +355,12 @@ impl Web3Rpcs { if futures.is_empty() { // no transaction or block subscriptions. + // TODO: i don't like this. it's a hack to keep the tokio task alive let handle = tokio::task::Builder::default() .name("noop") .spawn(async move { loop { sleep(Duration::from_secs(600)).await; - // TODO: "every interval, do a health check or disconnect the rpc" } })?; @@ -619,12 +619,13 @@ impl Web3Rpcs { }, _ = sleep_until(start + max_wait) => break, } - yield_now().await; } else { trace!("no potential rpcs and set to not wait"); break; } + yield_now().await; + // clear for the next loop potential_rpcs.clear(); } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 2366dc35..6dfec1aa 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -27,6 +27,7 @@ use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; +use tokio::task::yield_now; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, trace, warn, Level}; use url::Url; @@ -673,6 +674,8 @@ impl Web3Rpc { break; } + yield_now().await; + disconnect_watch_rx.changed().await?; } trace!("disconnect triggered on {}", rpc); @@ -896,6 +899,8 @@ impl Web3Rpc { } sleep_until(retry_at).await; + + yield_now().await; } Ok(OpenRequestResult::NotReady) => { // TODO: when can this happen? log? emit a stat? diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 07229452..7127bfde 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -12,7 +12,6 @@ use migration::sea_orm::prelude::Decimal; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot}; -use tokio::task::yield_now; use tokio::time::{interval, sleep}; use tracing::{debug, error, info, trace, warn, Instrument}; @@ -197,7 +196,6 @@ impl StatBuffer { break; } } - yield_now().await; } // TODO: wait on all websockets to close diff --git a/web3_proxy_cli/src/sub_commands/proxyd.rs b/web3_proxy_cli/src/sub_commands/proxyd.rs index a82e8d02..fea70b2e 100644 --- a/web3_proxy_cli/src/sub_commands/proxyd.rs +++ b/web3_proxy_cli/src/sub_commands/proxyd.rs @@ -64,7 +64,7 @@ impl ProxydSubCommand { /// this shouldn't really be pub except it makes test fixtures easier #[allow(clippy::too_many_arguments)] pub async fn _main( - mut top_config: TopConfig, + top_config: TopConfig, top_config_path: Option, frontend_port: Arc, prometheus_port: Arc, @@ -136,6 +136,7 @@ impl ProxydSubCommand { } // TODO: wait for SIGHUP instead? + // TODO: wait for file to change instead of polling? thread::sleep(Duration::from_secs(10)); }); } @@ -171,7 +172,6 @@ impl ProxydSubCommand { } } } - yield_now().await; } // start the frontend port