add a spot for retries

This commit is contained in:
Bryan Stitt 2023-10-13 00:22:57 -07:00
parent 0769e26afb
commit aa3b40e03f

View File

@ -47,7 +47,7 @@ use std::time::Duration;
use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::{yield_now, JoinHandle};
use tokio::time::{sleep, timeout, timeout_at, Instant};
use tokio::time::{sleep, sleep_until, timeout, timeout_at, Instant};
use tracing::{error, info, trace, warn};
// TODO: make this customizable?
@ -1095,28 +1095,68 @@ impl App {
// TODO: this clone is only for an error response. refactor to not need it
let error_id = request.id.clone();
let web3_request = match ValidatedRequest::new_with_app(
self,
authorization,
None,
None,
request.into(),
head_block,
)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
let mut last_success = None;
let mut last_error = None;
let mut web3_request;
return (a, b, vec![]);
// TODO: think more about how to handle retries without hammering our servers with errors
let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe();
let latest_start = Instant::now() + Duration::from_secs(3);
// TODO: how many retries?
loop {
// TODO: refresh the request instead of making new each time. then we need less clones
web3_request = match ValidatedRequest::new_with_app(
self,
authorization.clone(),
None,
None,
request.clone().into(),
head_block.clone(),
)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
let rpcs = vec![];
return (a, b, rpcs);
}
};
// turn some of the Web3ProxyErrors into Ok results
match self._proxy_request_with_caching(&web3_request).await {
Ok(response_data) => {
last_success = Some(response_data);
break;
}
Err(err) => {
last_error = Some(err);
}
}
select! {
_ = ranked_rpcs.changed() => {
// TODO: pass these RankedRpcs to ValidatedRequest::new_with_app
ranked_rpcs.borrow_and_update();
}
_ = sleep_until(latest_start) => {
// do not retry if we've already been trying for 3 seconds
break;
}
}
}
let last_response = if let Some(last_success) = last_success {
Ok(last_success)
} else {
Err(last_error.unwrap_or(anyhow::anyhow!("no success or error").into()))
};
// TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them
// turn some of the Web3ProxyErrors into Ok results
let (code, response) = match self._proxy_request_with_caching(&web3_request).await {
let (code, response) = match last_response {
Ok(response_data) => {
web3_request.error_response.store(false, Ordering::Relaxed);
web3_request