diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 499cbe01..8202ce32 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -46,7 +46,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; -use tokio::task::JoinHandle; +use tokio::task::{yield_now, JoinHandle}; use tokio::time::{sleep, timeout, timeout_at, Instant}; use tracing::{error, info, trace, warn}; @@ -584,6 +584,8 @@ impl App { _ = new_top_config_receiver.changed() => {} } } + + yield_now().await; } Ok(()) @@ -1140,72 +1142,50 @@ impl App { // TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them // turn some of the Web3ProxyErrors into Ok results - let max_tries = 3; - loop { - let tries = web3_request.backend_requests.lock().len(); + let (code, response) = match self._proxy_request_with_caching(&web3_request).await { + Ok(response_data) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - if tries > 0 { - // exponential backoff with jitter - // TODO: wait for RankedRpcs to change instead of this arbitrary sleep - // TODO: refresh the head block and any replacements of "latest" on the web3_request? - sleep(Duration::from_millis(100)).await; + (StatusCode::OK, response_data) } + Err(err @ Web3ProxyError::NullJsonRpcResult) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - let (code, response) = match self._proxy_request_with_caching(&web3_request).await { - Ok(response_data) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); + err.as_json_response_parts(web3_request.id()) + } + Err(Web3ProxyError::JsonRpcResponse(response_data)) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(response_data.is_error(), Ordering::Relaxed); - (StatusCode::OK, response_data) - } - Err(err @ Web3ProxyError::NullJsonRpcResult) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); + let response = + jsonrpc::ParsedResponse::from_response_data(response_data, web3_request.id()); + (StatusCode::OK, response.into()) + } + Err(err) => { + // max tries exceeded. return the error - err.as_json_response_parts(web3_request.id()) - } - Err(Web3ProxyError::JsonRpcResponse(response_data)) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(response_data.is_error(), Ordering::Relaxed); + web3_request.error_response.store(true, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - let response = jsonrpc::ParsedResponse::from_response_data( - response_data, - web3_request.id(), - ); - (StatusCode::OK, response.into()) - } - Err(err) => { - if tries <= max_tries { - // TODO: log the error before retrying - continue; - } + err.as_json_response_parts(web3_request.id()) + } + }; - // max tries exceeded. return the error + web3_request.add_response(&response); - web3_request.error_response.store(true, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); + let rpcs = web3_request.backend_rpcs_used(); - err.as_json_response_parts(web3_request.id()) - } - }; - - web3_request.add_response(&response); - - let rpcs = web3_request.backend_rpcs_used(); - - // there might be clones in the background, so this isn't a sure thing - let _ = web3_request.try_send_arc_stat(); - - return (code, response, rpcs); - } + (code, response, rpcs) } /// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index f7956464..af8cb3af 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -519,7 +519,7 @@ impl Web3Rpcs { // TODO: limit number of tries let rpcs = self.try_rpcs_for_request(web3_request).await?; - let stream = rpcs.to_stream(); + let stream = rpcs.to_stream().take(3); pin!(stream); @@ -640,11 +640,8 @@ impl Web3Rpcs { let watch_consensus_rpcs_receivers = self.watch_ranked_rpcs.receiver_count(); - let watch_consensus_head_receivers = if let Some(ref x) = self.watch_head_block { - Some(x.receiver_count()) - } else { - None - }; + let watch_consensus_head_receivers = + self.watch_head_block.as_ref().map(|x| x.receiver_count()); json!({ "conns": rpcs,