add another layer of retries

This commit is contained in:
Bryan Stitt 2023-06-28 15:04:55 -07:00
parent a89312664c
commit df865292a7
7 changed files with 144 additions and 37 deletions

@ -55,7 +55,7 @@ use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio::time::{sleep, timeout};
use tracing::{error, info, trace, warn, Level};
// TODO: make this customizable?
@ -1136,6 +1136,7 @@ impl Web3ProxyApp {
// TODO: trace log request.params before we send them to _proxy_request_with_caching which might modify them
// TODO: I think we have sufficient retries elsewhere and this will just slow us down.
let mut tries = 3;
let mut last_code_and_response = None;
while tries > 0 {
@ -1144,6 +1145,7 @@ impl Web3ProxyApp {
&request.method,
&mut request.params,
head_block_num,
Some(2),
&request_metadata,
)
.await
@ -1158,11 +1160,14 @@ impl Web3ProxyApp {
break;
}
tries -= 1;
// TODO: emit a stat?
// TODO: only log params in development
warn!(method=%request.method, params=?request.params, response=?last_code_and_response, "request failed");
warn!(method=%request.method, params=%request.params, response=?last_code_and_response, "request failed ({} tries remain)", tries);
tries -= 1;
// TODO: sleep a randomized amount of time?
sleep(Duration::from_millis(10)).await;
}
let (code, response) = last_code_and_response.expect("there should always be a response");
@ -1184,6 +1189,7 @@ impl Web3ProxyApp {
method: &str,
params: &mut serde_json::Value,
head_block_num: Option<U64>,
max_tries: Option<usize>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
// TODO: don't clone into a new string?
@ -1299,6 +1305,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
None,
None,
@ -1340,6 +1347,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
None,
None,
@ -1373,6 +1381,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
None,
None,
@ -1397,7 +1406,9 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(Duration::from_secs(30)),
// TODO: should this be block 0 instead?
Some(&U64::one()),
None,
)
@ -1642,7 +1653,7 @@ impl Web3ProxyApp {
let request_block = self
.balanced_rpcs
.block(&authorization, &request_block_hash, None, None)
.block(&authorization, &request_block_hash, None, Some(3), None)
.await?
.block;
@ -1672,7 +1683,7 @@ impl Web3ProxyApp {
let from_block = self
.balanced_rpcs
.block(&authorization, &from_block_hash, None, None)
.block(&authorization, &from_block_hash, None, Some(3), None)
.await?
.block;
@ -1683,7 +1694,7 @@ impl Web3ProxyApp {
let to_block = self
.balanced_rpcs
.block(&authorization, &to_block_hash, None, None)
.block(&authorization, &to_block_hash, None, Some(3), None)
.await?
.block;
@ -1717,6 +1728,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(max_wait),
from_block_num.as_ref(),
to_block_num.as_ref(),
@ -1743,6 +1755,7 @@ impl Web3ProxyApp {
method,
params,
Some(request_metadata),
max_tries,
Some(max_wait),
None,
None,

@ -76,7 +76,7 @@ pub async fn clean_block_number(
serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs
.block(authorization, &block_hash, None, None)
.block(authorization, &block_hash, None, Some(3), None)
.await
.context("fetching block number from hash")?;

@ -13,6 +13,7 @@ use axum::{
};
use derive_more::{Display, Error, From};
use ethers::prelude::ContractError;
use ethers::types::{H256, U64};
use http::header::InvalidHeaderValue;
use http::uri::InvalidUri;
use ipnet::AddrParseError;
@ -147,7 +148,14 @@ pub enum Web3ProxyError {
#[error(ignore)]
Timeout(Option<tokio::time::error::Elapsed>),
UlidDecode(ulid::DecodeError),
UnknownBlockNumber,
#[error(ignore)]
UnknownBlockHash(H256),
#[display(fmt = "known: {known}, unknown: {unknown}")]
#[error(ignore)]
UnknownBlockNumber {
known: U64,
unknown: U64,
},
UnknownKey,
UserAgentRequired,
#[error(ignore)]
@ -952,13 +960,28 @@ impl Web3ProxyError {
},
)
}
Self::UnknownBlockNumber => {
error!("UnknownBlockNumber");
Self::UnknownBlockHash(hash) => {
debug!(%hash, "UnknownBlockHash");
(
StatusCode::BAD_GATEWAY,
StatusCode::OK,
JsonRpcErrorData {
message: "no servers synced. unknown eth_blockNumber".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
message: format!("unknown block hash ({})", hash).into(),
code: None,
data: None,
},
)
}
Self::UnknownBlockNumber { known, unknown } => {
debug!(%known, %unknown, "UnknownBlockNumber");
(
StatusCode::OK,
JsonRpcErrorData {
message: format!(
"unknown block number. known: {}. unknown: {}",
known, unknown
)
.into(),
code: None,
data: None,
},
)
@ -1039,11 +1062,11 @@ impl Web3ProxyError {
)
}
Self::WebsocketOnly => {
trace!("WebsocketOnly");
trace!("WebsocketOnly. redirect_public_url not set");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
message: "redirect_public_url not set. only websockets work here".into(),
message: "only websockets work here".into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
},

@ -270,6 +270,7 @@ impl Web3Rpcs {
authorization: &Arc<Authorization>,
hash: &H256,
rpc: Option<&Arc<Web3Rpc>>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Web3ProxyBlock> {
// first, try to get the hash from our cache
@ -293,7 +294,7 @@ impl Web3Rpcs {
// block not in cache. we need to ask an rpc for it
let get_block_params = (*hash, false);
let block: Option<ArcBlock> = if let Some(rpc) = rpc {
let mut block: Option<ArcBlock> = if let Some(rpc) = rpc {
// ask a specific rpc
// TODO: request_with_metadata would probably be better than authorized_request
rpc.authorized_request::<_, Option<ArcBlock>>(
@ -301,19 +302,26 @@ impl Web3Rpcs {
&get_block_params,
authorization,
None,
max_tries,
max_wait,
)
.await?
} else {
// ask any rpc
None
};
if block.is_none() {
// try by asking any rpc
// TODO: retry if "Requested data is not available"
// TODO: request_with_metadata instead of internal_request
self.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&get_block_params,
max_wait,
)
.await?
block = self
.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&get_block_params,
max_tries,
max_wait,
)
.await?;
};
match block {
@ -321,8 +329,7 @@ impl Web3Rpcs {
let block = self.try_cache_block(block.try_into()?, false).await?;
Ok(block)
}
// TODO: better error. some blocks are known, just not this one
None => Err(Web3ProxyError::NoBlocksKnown),
None => Err(Web3ProxyError::UnknownBlockHash(*hash)),
}
}
@ -384,7 +391,9 @@ impl Web3Rpcs {
if let Some(block_hash) = self.blocks_by_number.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: configurable max wait and rpc
let block = self.block(authorization, &block_hash, None, None).await?;
let block = self
.block(authorization, &block_hash, None, Some(3), None)
.await?;
return Ok((block, block_depth));
}
@ -392,7 +401,12 @@ impl Web3Rpcs {
// block number not in cache. we need to ask an rpc for it
// TODO: this error is too broad
let response = self
.internal_request::<_, Option<ArcBlock>>("eth_getBlockByNumber", &(*num, false), None)
.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByNumber",
&(*num, false),
Some(3),
None,
)
.await?
.ok_or(Web3ProxyError::NoBlocksKnown)?;

@ -873,7 +873,7 @@ impl ConsensusFinder {
let parent_hash = block_to_check.parent_hash();
// TODO: i flip flop on passing rpc to this or not
match web3_rpcs
.block(authorization, parent_hash, Some(rpc), None)
.block(authorization, parent_hash, Some(rpc), None, None)
.await
{
Ok(parent_block) => block_to_check = parent_block,

@ -731,19 +731,22 @@ impl Web3Rpcs {
&self,
method: &str,
params: &P,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: no request_metadata means we won't have stats on this internal request.
self.request_with_metadata(method, params, None, max_wait, None, None)
self.request_with_metadata(method, params, None, max_tries, max_wait, None, None)
.await
}
/// Make a request with stat tracking.
#[allow(clippy::too_many_arguments)]
pub async fn request_with_metadata<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
mut max_tries: Option<usize>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
@ -764,9 +767,19 @@ impl Web3Rpcs {
loop {
if let Some(max_wait) = max_wait {
if start.elapsed() > max_wait {
trace!("max_wait exceeded");
break;
}
}
if let Some(max_tries) = max_tries.as_mut() {
if *max_tries == 0 {
trace!("max_tries exceeded");
break;
}
// prepare for the next loop
*max_tries -= 1;
}
match self
.wait_for_best_rpc(
@ -1149,11 +1162,13 @@ impl Web3Rpcs {
Err(Web3ProxyError::NoServersSynced)
}
#[allow(clippy::too_many_arguments)]
pub async fn try_proxy_connection<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
@ -1166,6 +1181,7 @@ impl Web3Rpcs {
method,
params,
request_metadata,
max_tries,
max_wait,
min_block_needed,
max_block_needed,

@ -317,6 +317,7 @@ impl Web3Rpc {
&None::<()>,
// error here are expected, so keep the level low
Some(Level::DEBUG.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await
@ -341,6 +342,7 @@ impl Web3Rpc {
)),
// error here are expected, so keep the level low
Some(Level::TRACE.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await;
@ -431,6 +433,7 @@ impl Web3Rpc {
"eth_chainId",
&None::<()>,
Some(Level::TRACE.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await?;
@ -555,6 +558,7 @@ impl Web3Rpc {
"eth_getTransactionByHash",
&(txid,),
error_handler,
Some(2),
Some(Duration::from_secs(5)),
)
.await?
@ -577,6 +581,7 @@ impl Web3Rpc {
"eth_getCode",
&(to, block_number),
error_handler,
Some(2),
Some(Duration::from_secs(5)),
)
.await?;
@ -822,6 +827,7 @@ impl Web3Rpc {
&("latest", false),
&authorization,
Some(Level::WARN.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await;
@ -858,6 +864,7 @@ impl Web3Rpc {
&("latest", false),
&authorization,
Some(Level::WARN.into()),
Some(2),
Some(Duration::from_secs(5)),
)
.await;
@ -1060,12 +1067,20 @@ impl Web3Rpc {
method: &str,
params: &P,
error_handler: Option<RequestErrorHandler>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
let authorization = Default::default();
self.authorized_request(method, params, &authorization, error_handler, max_wait)
.await
self.authorized_request(
method,
params,
&authorization,
error_handler,
max_tries,
max_wait,
)
.await
}
pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>(
@ -1074,16 +1089,42 @@ impl Web3Rpc {
params: &P,
authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>,
max_tries: Option<usize>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: take max_wait as a function argument?
let x = self
.wait_for_request_handle(authorization, max_wait, error_handler)
.await?
.request::<P, R>(method, params)
.await?;
let mut tries = max_tries.unwrap_or(1);
Ok(x)
let mut last_error: Option<Web3ProxyError> = None;
while tries > 0 {
tries -= 1;
let handle = match self
.wait_for_request_handle(authorization, max_wait, error_handler)
.await
{
Ok(x) => x,
Err(err) => {
last_error = Some(err);
continue;
}
};
match handle.request::<P, R>(method, params).await {
Ok(x) => return Ok(x),
Err(err) => {
last_error = Some(err.into());
continue;
}
}
}
if let Some(last_error) = last_error {
return Err(last_error);
}
Err(anyhow::anyhow!("authorized_request failed in an unexpected way").into())
}
}