Revert "Revert "remove max_tries for now""

This reverts commit 5fbcb75157cd012b723c9d2b37bb7947e5e05aa5.
This commit is contained in:
Bryan Stitt 2023-08-03 09:15:31 -07:00
parent ec0b733c8e
commit 74224977b7
6 changed files with 28 additions and 132 deletions

@ -1169,7 +1169,6 @@ impl Web3ProxyApp {
&request.method,
&mut request.params,
head_block,
Some(2),
&request_metadata,
)
.await
@ -1224,7 +1223,6 @@ impl Web3ProxyApp {
method: &str,
params: &mut serde_json::Value,
head_block: Option<&Web3ProxyBlock>,
max_tries: Option<usize>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
// TODO: don't clone into a new string?
@ -1338,7 +1336,6 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
None,
None,
@ -1378,7 +1375,6 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
None,
None,
@ -1412,7 +1408,6 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
None,
None,
@ -1437,7 +1432,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
// TODO: should this be block 0 instead?
Some(&U64::one()),
@ -1739,7 +1734,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(backend_request_timetout),
from_block_num.as_ref(),
to_block_num.as_ref(),
@ -1775,7 +1770,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(backend_request_timetout),
None,
None,

@ -98,7 +98,7 @@ pub async fn clean_block_number(
serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs
.block(&block_hash, None, Some(3), None)
.block(&block_hash, None, None)
.await
.context("fetching block number from hash")?;
@ -117,7 +117,7 @@ pub async fn clean_block_number(
.context("fetching block hash from number")?;
let block = rpcs
.block(&block_hash, None, Some(3), None)
.block(&block_hash, None, None)
.await
.context("fetching block from hash")?;
@ -138,7 +138,7 @@ pub async fn clean_block_number(
.context("fetching block hash from number")?;
let block = rpcs
.block(&block_hash, None, Some(3), None)
.block(&block_hash, None, None)
.await
.context("fetching block from hash")?;
@ -146,7 +146,7 @@ pub async fn clean_block_number(
}
} else if let Ok(block_hash) = serde_json::from_value::<H256>(x.clone()) {
let block = rpcs
.block(&block_hash, None, Some(3), None)
.block(&block_hash, None, None)
.await
.context("fetching block number from hash")?;

@ -267,7 +267,6 @@ impl Web3Rpcs {
&self,
hash: &H256,
rpc: Option<&Arc<Web3Rpc>>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Web3ProxyBlock> {
// first, try to get the hash from our cache
@ -285,7 +284,7 @@ impl Web3Rpcs {
}
// hashes don't match! this block must be in the middle of being uncled
// TODO: check known uncles
// TODO: check known uncles. clear uncle caches
}
if hash == &H256::zero() {
@ -303,7 +302,6 @@ impl Web3Rpcs {
"eth_getBlockByHash",
&get_block_params,
None,
max_tries,
max_wait,
)
.await
@ -321,7 +319,6 @@ impl Web3Rpcs {
.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&get_block_params,
max_tries,
max_wait,
)
.await?;
@ -393,7 +390,7 @@ impl Web3Rpcs {
if let Some(block_hash) = self.blocks_by_number.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: configurable max wait and rpc
let block = self.block(&block_hash, None, Some(3), None).await?;
let block = self.block(&block_hash, None, None).await?;
return Ok((block, block_depth));
}
@ -401,12 +398,7 @@ impl Web3Rpcs {
// block number not in cache. we need to ask an rpc for it
// TODO: this error is too broad
let response = self
.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByNumber",
&(*num, false),
Some(3),
None,
)
.internal_request::<_, Option<ArcBlock>>("eth_getBlockByNumber", &(*num, false), None)
.await?
.ok_or(Web3ProxyError::NoBlocksKnown)?;

@ -860,7 +860,7 @@ impl ConsensusFinder {
let parent_hash = block_to_check.parent_hash();
match web3_rpcs.block(parent_hash, Some(rpc), Some(1), None).await {
match web3_rpcs.block(parent_hash, Some(rpc), None).await {
Ok(parent_block) => block_to_check = parent_block,
Err(err) => {
debug!(

@ -32,7 +32,7 @@ use std::sync::Arc;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
@ -499,6 +499,7 @@ impl Web3Rpcs {
}
}
#[instrument(level = "trace")]
pub async fn wait_for_best_rpc(
&self,
request_metadata: Option<&Arc<RequestMetadata>>,
@ -536,6 +537,7 @@ impl Web3Rpcs {
.all()
.iter()
.filter(|rpc| {
// TODO: instrument this?
ranked_rpcs.rpc_will_work_now(
skip_rpcs,
min_block_needed,
@ -586,6 +588,7 @@ impl Web3Rpcs {
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_ranked_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
trace!("watch ranked rpcs changed");
},
_ = sleep_until(start + max_wait) => break,
},
@ -594,13 +597,16 @@ impl Web3Rpcs {
break;
}
} else if let Some(max_wait) = max_wait {
trace!(max_wait = max_wait.as_secs_f32(), "no potential rpcs");
select! {
_ = watch_ranked_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
trace!("watch ranked rpcs changed");
},
_ = sleep_until(start + max_wait) => break,
}
} else {
trace!("no potential rpcs and set to not wait");
break;
}
@ -726,64 +732,11 @@ impl Web3Rpcs {
&self,
method: &str,
params: &P,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: no request_metadata means we won't have stats on this internal request.
self.request_with_metadata_and_retries(
method, params, None, max_tries, max_wait, None, None,
)
.await
}
/// Make a request with stat tracking.
#[allow(clippy::too_many_arguments)]
pub async fn request_with_metadata_and_retries<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<R> {
let mut tries = max_tries.unwrap_or(1);
let mut last_error = None;
while tries > 0 {
tries -= 1;
match self
.request_with_metadata(
method,
params,
request_metadata,
max_wait,
min_block_needed,
max_block_needed,
)
.await
{
Ok(x) => return Ok(x),
Err(Web3ProxyError::JsonRpcErrorData(err)) => {
// TODO: retry some of these? i think request_with_metadata is already smart enough though
return Err(err.into());
}
Err(err) => {
// TODO: only log params in dev
warn!(rpc=%self, %method, ?params, ?err, %tries, "retry-able error");
last_error = Some(err)
}
}
}
if let Some(err) = last_error {
return Err(err);
}
Err(anyhow::anyhow!("no response, but no error either. this is a bug").into())
self.request_with_metadata(method, params, None, max_wait, None, None)
.await
}
/// Make a request with stat tracking.
@ -1262,7 +1215,6 @@ impl Web3Rpcs {
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
@ -1271,11 +1223,10 @@ impl Web3Rpcs {
match proxy_mode {
ProxyMode::Debug | ProxyMode::Best => {
self.request_with_metadata_and_retries(
self.request_with_metadata(
method,
params,
request_metadata,
max_tries,
max_wait,
min_block_needed,
max_block_needed,

@ -303,7 +303,6 @@ impl Web3Rpc {
&[(); 0],
// error here are expected, so keep the level low
Some(Level::DEBUG.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await
@ -328,7 +327,6 @@ impl Web3Rpc {
)),
// error here are expected, so keep the level low
Some(Level::TRACE.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await;
@ -419,7 +417,6 @@ impl Web3Rpc {
"eth_chainId",
&[(); 0],
Some(Level::TRACE.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await?;
@ -546,7 +543,6 @@ impl Web3Rpc {
"eth_getTransactionByHash",
&(txid,),
error_handler,
Some(2),
Some(Duration::from_secs(5)),
)
.await?
@ -569,7 +565,6 @@ impl Web3Rpc {
"eth_getCode",
&(to, block_number),
error_handler,
Some(2),
Some(Duration::from_secs(5)),
)
.await?;
@ -803,7 +798,6 @@ impl Web3Rpc {
"eth_getBlockByNumber",
&("latest", false),
error_handler,
Some(2),
Some(Duration::from_secs(5)),
)
.await;
@ -839,7 +833,6 @@ impl Web3Rpc {
"eth_getBlockByNumber",
&("latest", false),
error_handler,
Some(2),
Some(Duration::from_secs(5)),
)
.await;
@ -979,20 +972,12 @@ impl Web3Rpc {
method: &str,
params: &P,
error_handler: Option<RequestErrorHandler>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
let authorization = Default::default();
self.authorized_request(
method,
params,
&authorization,
error_handler,
max_tries,
max_wait,
)
.await
self.authorized_request(method, params, &authorization, error_handler, max_wait)
.await
}
pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>(
@ -1001,42 +986,15 @@ impl Web3Rpc {
params: &P,
authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: take max_wait as a function argument?
let mut tries = max_tries.unwrap_or(1);
let handle = self
.wait_for_request_handle(authorization, max_wait, error_handler)
.await?;
let mut last_error: Option<Web3ProxyError> = None;
let x = handle.request::<P, R>(method, params).await?;
while tries > 0 {
tries -= 1;
let handle = match self
.wait_for_request_handle(authorization, max_wait, error_handler)
.await
{
Ok(x) => x,
Err(err) => {
last_error = Some(err);
continue;
}
};
match handle.request::<P, R>(method, params).await {
Ok(x) => return Ok(x),
Err(err) => {
last_error = Some(err.into());
continue;
}
}
}
if let Some(last_error) = last_error {
return Err(last_error);
}
Err(anyhow::anyhow!("authorized_request failed in an unexpected way").into())
Ok(x)
}
}