remove max_tries for now

i think this is causing problematic looping
This commit is contained in:
Bryan Stitt 2023-08-02 22:07:28 -07:00
parent b69ac51438
commit e0d7d11398
6 changed files with 34 additions and 134 deletions

View File

@ -1169,7 +1169,6 @@ impl Web3ProxyApp {
&request.method, &request.method,
&mut request.params, &mut request.params,
head_block, head_block,
Some(2),
&request_metadata, &request_metadata,
) )
.await .await
@ -1224,7 +1223,6 @@ impl Web3ProxyApp {
method: &str, method: &str,
params: &mut serde_json::Value, params: &mut serde_json::Value,
head_block: Option<&Web3ProxyBlock>, head_block: Option<&Web3ProxyBlock>,
max_tries: Option<usize>,
request_metadata: &Arc<RequestMetadata>, request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> { ) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
// TODO: don't clone into a new string? // TODO: don't clone into a new string?
@ -1338,7 +1336,6 @@ impl Web3ProxyApp {
method, method,
params, params,
Some(request_metadata), Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)), Some(Duration::from_secs(30)),
None, None,
None, None,
@ -1378,7 +1375,6 @@ impl Web3ProxyApp {
method, method,
params, params,
Some(request_metadata), Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)), Some(Duration::from_secs(30)),
None, None,
None, None,
@ -1412,7 +1408,6 @@ impl Web3ProxyApp {
method, method,
params, params,
Some(request_metadata), Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)), Some(Duration::from_secs(30)),
None, None,
None, None,
@ -1437,7 +1432,7 @@ impl Web3ProxyApp {
method, method,
params, params,
Some(request_metadata), Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)), Some(Duration::from_secs(30)),
// TODO: should this be block 0 instead? // TODO: should this be block 0 instead?
Some(&U64::one()), Some(&U64::one()),
@ -1739,7 +1734,7 @@ impl Web3ProxyApp {
method, method,
params, params,
Some(request_metadata), Some(request_metadata),
max_tries,
Some(backend_request_timetout), Some(backend_request_timetout),
from_block_num.as_ref(), from_block_num.as_ref(),
to_block_num.as_ref(), to_block_num.as_ref(),
@ -1775,7 +1770,7 @@ impl Web3ProxyApp {
method, method,
params, params,
Some(request_metadata), Some(request_metadata),
max_tries,
Some(backend_request_timetout), Some(backend_request_timetout),
None, None,
None, None,

View File

@ -98,7 +98,7 @@ pub async fn clean_block_number(
serde_json::from_value(block_hash).context("decoding blockHash")?; serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs let block = rpcs
.block(&block_hash, None, Some(3), None) .block(&block_hash, None, None)
.await .await
.context("fetching block number from hash")?; .context("fetching block number from hash")?;
@ -117,7 +117,7 @@ pub async fn clean_block_number(
.context("fetching block hash from number")?; .context("fetching block hash from number")?;
let block = rpcs let block = rpcs
.block(&block_hash, None, Some(3), None) .block(&block_hash, None, None)
.await .await
.context("fetching block from hash")?; .context("fetching block from hash")?;
@ -138,7 +138,7 @@ pub async fn clean_block_number(
.context("fetching block hash from number")?; .context("fetching block hash from number")?;
let block = rpcs let block = rpcs
.block(&block_hash, None, Some(3), None) .block(&block_hash, None, None)
.await .await
.context("fetching block from hash")?; .context("fetching block from hash")?;
@ -146,7 +146,7 @@ pub async fn clean_block_number(
} }
} else if let Ok(block_hash) = serde_json::from_value::<H256>(x.clone()) { } else if let Ok(block_hash) = serde_json::from_value::<H256>(x.clone()) {
let block = rpcs let block = rpcs
.block(&block_hash, None, Some(3), None) .block(&block_hash, None, None)
.await .await
.context("fetching block number from hash")?; .context("fetching block number from hash")?;

View File

@ -267,7 +267,6 @@ impl Web3Rpcs {
&self, &self,
hash: &H256, hash: &H256,
rpc: Option<&Arc<Web3Rpc>>, rpc: Option<&Arc<Web3Rpc>>,
max_tries: Option<usize>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
) -> Web3ProxyResult<Web3ProxyBlock> { ) -> Web3ProxyResult<Web3ProxyBlock> {
// first, try to get the hash from our cache // first, try to get the hash from our cache
@ -285,7 +284,7 @@ impl Web3Rpcs {
} }
// hashes don't match! this block must be in the middle of being uncled // hashes don't match! this block must be in the middle of being uncled
// TODO: check known uncles // TODO: check known uncles. clear uncle caches
} }
if hash == &H256::zero() { if hash == &H256::zero() {
@ -303,10 +302,11 @@ impl Web3Rpcs {
"eth_getBlockByHash", "eth_getBlockByHash",
&get_block_params, &get_block_params,
None, None,
max_tries,
max_wait, max_wait,
) )
.await? .await
.ok()
.flatten()
} else { } else {
None None
}; };
@ -319,7 +319,6 @@ impl Web3Rpcs {
.internal_request::<_, Option<ArcBlock>>( .internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash", "eth_getBlockByHash",
&get_block_params, &get_block_params,
max_tries,
max_wait, max_wait,
) )
.await?; .await?;
@ -391,7 +390,7 @@ impl Web3Rpcs {
if let Some(block_hash) = self.blocks_by_number.get(num) { if let Some(block_hash) = self.blocks_by_number.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: configurable max wait and rpc // TODO: configurable max wait and rpc
let block = self.block(&block_hash, None, Some(3), None).await?; let block = self.block(&block_hash, None, None).await?;
return Ok((block, block_depth)); return Ok((block, block_depth));
} }
@ -399,12 +398,7 @@ impl Web3Rpcs {
// block number not in cache. we need to ask an rpc for it // block number not in cache. we need to ask an rpc for it
// TODO: this error is too broad // TODO: this error is too broad
let response = self let response = self
.internal_request::<_, Option<ArcBlock>>( .internal_request::<_, Option<ArcBlock>>("eth_getBlockByNumber", &(*num, false), None)
"eth_getBlockByNumber",
&(*num, false),
Some(3),
None,
)
.await? .await?
.ok_or(Web3ProxyError::NoBlocksKnown)?; .ok_or(Web3ProxyError::NoBlocksKnown)?;

View File

@ -860,7 +860,7 @@ impl ConsensusFinder {
let parent_hash = block_to_check.parent_hash(); let parent_hash = block_to_check.parent_hash();
match web3_rpcs.block(parent_hash, Some(rpc), Some(1), None).await { match web3_rpcs.block(parent_hash, Some(rpc), None).await {
Ok(parent_block) => block_to_check = parent_block, Ok(parent_block) => block_to_check = parent_block,
Err(err) => { Err(err) => {
debug!( debug!(

View File

@ -32,7 +32,7 @@ use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, sleep_until, Duration, Instant}; use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers. /// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)] #[derive(From)]
@ -499,6 +499,7 @@ impl Web3Rpcs {
} }
} }
#[instrument(level = "trace")]
pub async fn wait_for_best_rpc( pub async fn wait_for_best_rpc(
&self, &self,
request_metadata: Option<&Arc<RequestMetadata>>, request_metadata: Option<&Arc<RequestMetadata>>,
@ -521,7 +522,9 @@ impl Web3Rpcs {
let mut potential_rpcs = Vec::new(); let mut potential_rpcs = Vec::new();
loop { // TODO: max loop?
for attempt in 0..32 {
trace!(attempt);
// TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start
let ranked_rpcs: Option<Arc<RankedRpcs>> = let ranked_rpcs: Option<Arc<RankedRpcs>> =
watch_ranked_rpcs.borrow_and_update().clone(); watch_ranked_rpcs.borrow_and_update().clone();
@ -535,6 +538,7 @@ impl Web3Rpcs {
.all() .all()
.iter() .iter()
.filter(|rpc| { .filter(|rpc| {
// TODO: instrument this?
ranked_rpcs.rpc_will_work_now( ranked_rpcs.rpc_will_work_now(
skip_rpcs, skip_rpcs,
min_block_needed, min_block_needed,
@ -585,19 +589,23 @@ impl Web3Rpcs {
ShouldWaitForBlock::Wait { .. } => select! { ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_ranked_rpcs.changed() => { _ = watch_ranked_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop // no need to borrow_and_update because we do that at the top of the loop
trace!("watch ranked rpcs changed");
}, },
_ = sleep_until(start + max_wait) => break, _ = sleep_until(start + max_wait) => break,
}, },
} }
} }
} else if let Some(max_wait) = max_wait { } else if let Some(max_wait) = max_wait {
trace!(max_wait = max_wait.as_secs_f32(), "no potential rpcs");
select! { select! {
_ = watch_ranked_rpcs.changed() => { _ = watch_ranked_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop // no need to borrow_and_update because we do that at the top of the loop
trace!("watch ranked rpcs changed");
}, },
_ = sleep_until(start + max_wait) => break, _ = sleep_until(start + max_wait) => break,
} }
} else { } else {
trace!("no potential rpcs and set to not wait");
break; break;
} }
@ -723,64 +731,11 @@ impl Web3Rpcs {
&self, &self,
method: &str, method: &str,
params: &P, params: &P,
max_tries: Option<usize>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
) -> Web3ProxyResult<R> { ) -> Web3ProxyResult<R> {
// TODO: no request_metadata means we won't have stats on this internal request. // TODO: no request_metadata means we won't have stats on this internal request.
self.request_with_metadata_and_retries( self.request_with_metadata(method, params, None, max_wait, None, None)
method, params, None, max_tries, max_wait, None, None, .await
)
.await
}
/// Make a request with stat tracking.
#[allow(clippy::too_many_arguments)]
pub async fn request_with_metadata_and_retries<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<R> {
let mut tries = max_tries.unwrap_or(1);
let mut last_error = None;
while tries > 0 {
tries -= 1;
match self
.request_with_metadata(
method,
params,
request_metadata,
max_wait,
min_block_needed,
max_block_needed,
)
.await
{
Ok(x) => return Ok(x),
Err(Web3ProxyError::JsonRpcErrorData(err)) => {
// TODO: retry some of these? i think request_with_metadata is already smart enough though
return Err(err.into());
}
Err(err) => {
// TODO: only log params in dev
warn!(rpc=%self, %method, ?params, ?err, %tries, "retry-able error");
last_error = Some(err)
}
}
}
if let Some(err) = last_error {
return Err(err);
}
Err(anyhow::anyhow!("no response, but no error either. this is a bug").into())
} }
/// Make a request with stat tracking. /// Make a request with stat tracking.
@ -1259,7 +1214,6 @@ impl Web3Rpcs {
method: &str, method: &str,
params: &P, params: &P,
request_metadata: Option<&Arc<RequestMetadata>>, request_metadata: Option<&Arc<RequestMetadata>>,
max_tries: Option<usize>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>, max_block_needed: Option<&U64>,
@ -1268,11 +1222,10 @@ impl Web3Rpcs {
match proxy_mode { match proxy_mode {
ProxyMode::Debug | ProxyMode::Best => { ProxyMode::Debug | ProxyMode::Best => {
self.request_with_metadata_and_retries( self.request_with_metadata(
method, method,
params, params,
request_metadata, request_metadata,
max_tries,
max_wait, max_wait,
min_block_needed, min_block_needed,
max_block_needed, max_block_needed,

View File

@ -303,7 +303,6 @@ impl Web3Rpc {
&[(); 0], &[(); 0],
// error here are expected, so keep the level low // error here are expected, so keep the level low
Some(Level::DEBUG.into()), Some(Level::DEBUG.into()),
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await .await
@ -328,7 +327,6 @@ impl Web3Rpc {
)), )),
// error here are expected, so keep the level low // error here are expected, so keep the level low
Some(Level::TRACE.into()), Some(Level::TRACE.into()),
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await; .await;
@ -419,7 +417,6 @@ impl Web3Rpc {
"eth_chainId", "eth_chainId",
&[(); 0], &[(); 0],
Some(Level::TRACE.into()), Some(Level::TRACE.into()),
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await?; .await?;
@ -546,7 +543,6 @@ impl Web3Rpc {
"eth_getTransactionByHash", "eth_getTransactionByHash",
&(txid,), &(txid,),
error_handler, error_handler,
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await? .await?
@ -569,7 +565,6 @@ impl Web3Rpc {
"eth_getCode", "eth_getCode",
&(to, block_number), &(to, block_number),
error_handler, error_handler,
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await?; .await?;
@ -803,7 +798,6 @@ impl Web3Rpc {
"eth_getBlockByNumber", "eth_getBlockByNumber",
&("latest", false), &("latest", false),
error_handler, error_handler,
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await; .await;
@ -839,7 +833,6 @@ impl Web3Rpc {
"eth_getBlockByNumber", "eth_getBlockByNumber",
&("latest", false), &("latest", false),
error_handler, error_handler,
Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
.await; .await;
@ -979,20 +972,12 @@ impl Web3Rpc {
method: &str, method: &str,
params: &P, params: &P,
error_handler: Option<RequestErrorHandler>, error_handler: Option<RequestErrorHandler>,
max_tries: Option<usize>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
) -> Web3ProxyResult<R> { ) -> Web3ProxyResult<R> {
let authorization = Default::default(); let authorization = Default::default();
self.authorized_request( self.authorized_request(method, params, &authorization, error_handler, max_wait)
method, .await
params,
&authorization,
error_handler,
max_tries,
max_wait,
)
.await
} }
pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>( pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>(
@ -1001,42 +986,15 @@ impl Web3Rpc {
params: &P, params: &P,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>, error_handler: Option<RequestErrorHandler>,
max_tries: Option<usize>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
) -> Web3ProxyResult<R> { ) -> Web3ProxyResult<R> {
// TODO: take max_wait as a function argument? let handle = self
let mut tries = max_tries.unwrap_or(1); .wait_for_request_handle(authorization, max_wait, error_handler)
.await?;
let mut last_error: Option<Web3ProxyError> = None; let x = handle.request::<P, R>(method, params).await?;
while tries > 0 { Ok(x)
tries -= 1;
let handle = match self
.wait_for_request_handle(authorization, max_wait, error_handler)
.await
{
Ok(x) => x,
Err(err) => {
last_error = Some(err);
continue;
}
};
match handle.request::<P, R>(method, params).await {
Ok(x) => return Ok(x),
Err(err) => {
last_error = Some(err.into());
continue;
}
}
}
if let Some(last_error) = last_error {
return Err(last_error);
}
Err(anyhow::anyhow!("authorized_request failed in an unexpected way").into())
} }
} }