move retries into request_with_metadata

This commit is contained in:
Bryan Stitt 2023-10-11 20:58:13 -07:00
parent f696195fed
commit 4009995bb3
2 changed files with 41 additions and 64 deletions

View File

@ -46,7 +46,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::select; use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::{yield_now, JoinHandle};
use tokio::time::{sleep, timeout, timeout_at, Instant}; use tokio::time::{sleep, timeout, timeout_at, Instant};
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
@ -584,6 +584,8 @@ impl App {
_ = new_top_config_receiver.changed() => {} _ = new_top_config_receiver.changed() => {}
} }
} }
yield_now().await;
} }
Ok(()) Ok(())
@ -1140,72 +1142,50 @@ impl App {
// TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them // TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them
// turn some of the Web3ProxyErrors into Ok results // turn some of the Web3ProxyErrors into Ok results
let max_tries = 3; let (code, response) = match self._proxy_request_with_caching(&web3_request).await {
loop { Ok(response_data) => {
let tries = web3_request.backend_requests.lock().len(); web3_request.error_response.store(false, Ordering::Relaxed);
web3_request
.user_error_response
.store(false, Ordering::Relaxed);
if tries > 0 { (StatusCode::OK, response_data)
// exponential backoff with jitter
// TODO: wait for RankedRpcs to change instead of this arbitrary sleep
// TODO: refresh the head block and any replacements of "latest" on the web3_request?
sleep(Duration::from_millis(100)).await;
} }
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
web3_request.error_response.store(false, Ordering::Relaxed);
web3_request
.user_error_response
.store(false, Ordering::Relaxed);
let (code, response) = match self._proxy_request_with_caching(&web3_request).await { err.as_json_response_parts(web3_request.id())
Ok(response_data) => { }
web3_request.error_response.store(false, Ordering::Relaxed); Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
web3_request web3_request.error_response.store(false, Ordering::Relaxed);
.user_error_response web3_request
.store(false, Ordering::Relaxed); .user_error_response
.store(response_data.is_error(), Ordering::Relaxed);
(StatusCode::OK, response_data) let response =
} jsonrpc::ParsedResponse::from_response_data(response_data, web3_request.id());
Err(err @ Web3ProxyError::NullJsonRpcResult) => { (StatusCode::OK, response.into())
web3_request.error_response.store(false, Ordering::Relaxed); }
web3_request Err(err) => {
.user_error_response // max tries exceeded. return the error
.store(false, Ordering::Relaxed);
err.as_json_response_parts(web3_request.id()) web3_request.error_response.store(true, Ordering::Relaxed);
} web3_request
Err(Web3ProxyError::JsonRpcResponse(response_data)) => { .user_error_response
web3_request.error_response.store(false, Ordering::Relaxed); .store(false, Ordering::Relaxed);
web3_request
.user_error_response
.store(response_data.is_error(), Ordering::Relaxed);
let response = jsonrpc::ParsedResponse::from_response_data( err.as_json_response_parts(web3_request.id())
response_data, }
web3_request.id(), };
);
(StatusCode::OK, response.into())
}
Err(err) => {
if tries <= max_tries {
// TODO: log the error before retrying
continue;
}
// max tries exceeded. return the error web3_request.add_response(&response);
web3_request.error_response.store(true, Ordering::Relaxed); let rpcs = web3_request.backend_rpcs_used();
web3_request
.user_error_response
.store(false, Ordering::Relaxed);
err.as_json_response_parts(web3_request.id()) (code, response, rpcs)
}
};
web3_request.add_response(&response);
let rpcs = web3_request.backend_rpcs_used();
// there might be clones in the background, so this isn't a sure thing
let _ = web3_request.try_send_arc_stat();
return (code, response, rpcs);
}
} }
/// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use /// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use

View File

@ -519,7 +519,7 @@ impl Web3Rpcs {
// TODO: limit number of tries // TODO: limit number of tries
let rpcs = self.try_rpcs_for_request(web3_request).await?; let rpcs = self.try_rpcs_for_request(web3_request).await?;
let stream = rpcs.to_stream(); let stream = rpcs.to_stream().take(3);
pin!(stream); pin!(stream);
@ -640,11 +640,8 @@ impl Web3Rpcs {
let watch_consensus_rpcs_receivers = self.watch_ranked_rpcs.receiver_count(); let watch_consensus_rpcs_receivers = self.watch_ranked_rpcs.receiver_count();
let watch_consensus_head_receivers = if let Some(ref x) = self.watch_head_block { let watch_consensus_head_receivers =
Some(x.receiver_count()) self.watch_head_block.as_ref().map(|x| x.receiver_count());
} else {
None
};
json!({ json!({
"conns": rpcs, "conns": rpcs,