diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 9c8150db..f0f7e765 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -55,7 +55,7 @@ use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::{error, info, trace, warn, Level}; // TODO: make this customizable? @@ -1136,6 +1136,7 @@ impl Web3ProxyApp { // TODO: trace log request.params before we send them to _proxy_request_with_caching which might modify them + // TODO: I think we have sufficient retries elsewhere and this will just slow us down. let mut tries = 3; let mut last_code_and_response = None; while tries > 0 { @@ -1144,6 +1145,7 @@ impl Web3ProxyApp { &request.method, &mut request.params, head_block_num, + Some(2), &request_metadata, ) .await @@ -1158,11 +1160,14 @@ impl Web3ProxyApp { break; } + tries -= 1; + // TODO: emit a stat? // TODO: only log params in development - warn!(method=%request.method, params=?request.params, response=?last_code_and_response, "request failed"); + warn!(method=%request.method, params=%request.params, response=?last_code_and_response, "request failed ({} tries remain)", tries); - tries -= 1; + // TODO: sleep a randomized amount of time? + sleep(Duration::from_millis(10)).await; } let (code, response) = last_code_and_response.expect("there should always be a response"); @@ -1184,6 +1189,7 @@ impl Web3ProxyApp { method: &str, params: &mut serde_json::Value, head_block_num: Option, + max_tries: Option, request_metadata: &Arc, ) -> Web3ProxyResult>> { // TODO: don't clone into a new string? @@ -1299,6 +1305,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), None, None, @@ -1340,6 +1347,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), None, None, @@ -1373,6 +1381,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), None, None, @@ -1397,7 +1406,9 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), + // TODO: should this be block 0 instead? Some(&U64::one()), None, ) @@ -1642,7 +1653,7 @@ impl Web3ProxyApp { let request_block = self .balanced_rpcs - .block(&authorization, &request_block_hash, None, None) + .block(&authorization, &request_block_hash, None, Some(3), None) .await? .block; @@ -1672,7 +1683,7 @@ impl Web3ProxyApp { let from_block = self .balanced_rpcs - .block(&authorization, &from_block_hash, None, None) + .block(&authorization, &from_block_hash, None, Some(3), None) .await? .block; @@ -1683,7 +1694,7 @@ impl Web3ProxyApp { let to_block = self .balanced_rpcs - .block(&authorization, &to_block_hash, None, None) + .block(&authorization, &to_block_hash, None, Some(3), None) .await? .block; @@ -1717,6 +1728,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(max_wait), from_block_num.as_ref(), to_block_num.as_ref(), @@ -1743,6 +1755,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(max_wait), None, None, diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 2836f542..54d6dbbf 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -76,7 +76,7 @@ pub async fn clean_block_number( serde_json::from_value(block_hash).context("decoding blockHash")?; let block = rpcs - .block(authorization, &block_hash, None, None) + .block(authorization, &block_hash, None, Some(3), None) .await .context("fetching block number from hash")?; diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 937ea310..5f3fe543 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -13,6 +13,7 @@ use axum::{ }; use derive_more::{Display, Error, From}; use ethers::prelude::ContractError; +use ethers::types::{H256, U64}; use http::header::InvalidHeaderValue; use http::uri::InvalidUri; use ipnet::AddrParseError; @@ -147,7 +148,14 @@ pub enum Web3ProxyError { #[error(ignore)] Timeout(Option), UlidDecode(ulid::DecodeError), - UnknownBlockNumber, + #[error(ignore)] + UnknownBlockHash(H256), + #[display(fmt = "known: {known}, unknown: {unknown}")] + #[error(ignore)] + UnknownBlockNumber { + known: U64, + unknown: U64, + }, UnknownKey, UserAgentRequired, #[error(ignore)] @@ -952,13 +960,28 @@ impl Web3ProxyError { }, ) } - Self::UnknownBlockNumber => { - error!("UnknownBlockNumber"); + Self::UnknownBlockHash(hash) => { + debug!(%hash, "UnknownBlockHash"); ( - StatusCode::BAD_GATEWAY, + StatusCode::OK, JsonRpcErrorData { - message: "no servers synced. unknown eth_blockNumber".into(), - code: StatusCode::BAD_GATEWAY.as_u16().into(), + message: format!("unknown block hash ({})", hash).into(), + code: None, + data: None, + }, + ) + } + Self::UnknownBlockNumber { known, unknown } => { + debug!(%known, %unknown, "UnknownBlockNumber"); + ( + StatusCode::OK, + JsonRpcErrorData { + message: format!( + "unknown block number. known: {}. unknown: {}", + known, unknown + ) + .into(), + code: None, data: None, }, ) @@ -1039,11 +1062,11 @@ impl Web3ProxyError { ) } Self::WebsocketOnly => { - trace!("WebsocketOnly"); + trace!("WebsocketOnly. redirect_public_url not set"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: "redirect_public_url not set. only websockets work here".into(), + message: "only websockets work here".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 5a9250be..3ed641f6 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -270,6 +270,7 @@ impl Web3Rpcs { authorization: &Arc, hash: &H256, rpc: Option<&Arc>, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // first, try to get the hash from our cache @@ -293,7 +294,7 @@ impl Web3Rpcs { // block not in cache. we need to ask an rpc for it let get_block_params = (*hash, false); - let block: Option = if let Some(rpc) = rpc { + let mut block: Option = if let Some(rpc) = rpc { // ask a specific rpc // TODO: request_with_metadata would probably be better than authorized_request rpc.authorized_request::<_, Option>( @@ -301,19 +302,26 @@ impl Web3Rpcs { &get_block_params, authorization, None, + max_tries, max_wait, ) .await? } else { - // ask any rpc + None + }; + + if block.is_none() { + // try by asking any rpc // TODO: retry if "Requested data is not available" // TODO: request_with_metadata instead of internal_request - self.internal_request::<_, Option>( - "eth_getBlockByHash", - &get_block_params, - max_wait, - ) - .await? + block = self + .internal_request::<_, Option>( + "eth_getBlockByHash", + &get_block_params, + max_tries, + max_wait, + ) + .await?; }; match block { @@ -321,8 +329,7 @@ impl Web3Rpcs { let block = self.try_cache_block(block.try_into()?, false).await?; Ok(block) } - // TODO: better error. some blocks are known, just not this one - None => Err(Web3ProxyError::NoBlocksKnown), + None => Err(Web3ProxyError::UnknownBlockHash(*hash)), } } @@ -384,7 +391,9 @@ impl Web3Rpcs { if let Some(block_hash) = self.blocks_by_number.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: configurable max wait and rpc - let block = self.block(authorization, &block_hash, None, None).await?; + let block = self + .block(authorization, &block_hash, None, Some(3), None) + .await?; return Ok((block, block_depth)); } @@ -392,7 +401,12 @@ impl Web3Rpcs { // block number not in cache. we need to ask an rpc for it // TODO: this error is too broad let response = self - .internal_request::<_, Option>("eth_getBlockByNumber", &(*num, false), None) + .internal_request::<_, Option>( + "eth_getBlockByNumber", + &(*num, false), + Some(3), + None, + ) .await? .ok_or(Web3ProxyError::NoBlocksKnown)?; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index e7b19715..e4815f49 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -873,7 +873,7 @@ impl ConsensusFinder { let parent_hash = block_to_check.parent_hash(); // TODO: i flip flop on passing rpc to this or not match web3_rpcs - .block(authorization, parent_hash, Some(rpc), None) + .block(authorization, parent_hash, Some(rpc), None, None) .await { Ok(parent_block) => block_to_check = parent_block, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1119d1bd..6479a1c3 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -731,19 +731,22 @@ impl Web3Rpcs { &self, method: &str, params: &P, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // TODO: no request_metadata means we won't have stats on this internal request. - self.request_with_metadata(method, params, None, max_wait, None, None) + self.request_with_metadata(method, params, None, max_tries, max_wait, None, None) .await } /// Make a request with stat tracking. + #[allow(clippy::too_many_arguments)] pub async fn request_with_metadata( &self, method: &str, params: &P, request_metadata: Option<&Arc>, + mut max_tries: Option, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, @@ -764,9 +767,19 @@ impl Web3Rpcs { loop { if let Some(max_wait) = max_wait { if start.elapsed() > max_wait { + trace!("max_wait exceeded"); break; } } + if let Some(max_tries) = max_tries.as_mut() { + if *max_tries == 0 { + trace!("max_tries exceeded"); + break; + } + + // prepare for the next loop + *max_tries -= 1; + } match self .wait_for_best_rpc( @@ -1149,11 +1162,13 @@ impl Web3Rpcs { Err(Web3ProxyError::NoServersSynced) } + #[allow(clippy::too_many_arguments)] pub async fn try_proxy_connection( &self, method: &str, params: &P, request_metadata: Option<&Arc>, + max_tries: Option, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, @@ -1166,6 +1181,7 @@ impl Web3Rpcs { method, params, request_metadata, + max_tries, max_wait, min_block_needed, max_block_needed, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 72747d49..21d4fd05 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -317,6 +317,7 @@ impl Web3Rpc { &None::<()>, // error here are expected, so keep the level low Some(Level::DEBUG.into()), + Some(2), Some(Duration::from_secs(5)), ) .await @@ -341,6 +342,7 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Some(Level::TRACE.into()), + Some(2), Some(Duration::from_secs(5)), ) .await; @@ -431,6 +433,7 @@ impl Web3Rpc { "eth_chainId", &None::<()>, Some(Level::TRACE.into()), + Some(2), Some(Duration::from_secs(5)), ) .await?; @@ -555,6 +558,7 @@ impl Web3Rpc { "eth_getTransactionByHash", &(txid,), error_handler, + Some(2), Some(Duration::from_secs(5)), ) .await? @@ -577,6 +581,7 @@ impl Web3Rpc { "eth_getCode", &(to, block_number), error_handler, + Some(2), Some(Duration::from_secs(5)), ) .await?; @@ -822,6 +827,7 @@ impl Web3Rpc { &("latest", false), &authorization, Some(Level::WARN.into()), + Some(2), Some(Duration::from_secs(5)), ) .await; @@ -858,6 +864,7 @@ impl Web3Rpc { &("latest", false), &authorization, Some(Level::WARN.into()), + Some(2), Some(Duration::from_secs(5)), ) .await; @@ -1060,12 +1067,20 @@ impl Web3Rpc { method: &str, params: &P, error_handler: Option, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { let authorization = Default::default(); - self.authorized_request(method, params, &authorization, error_handler, max_wait) - .await + self.authorized_request( + method, + params, + &authorization, + error_handler, + max_tries, + max_wait, + ) + .await } pub async fn authorized_request( @@ -1074,16 +1089,42 @@ impl Web3Rpc { params: &P, authorization: &Arc, error_handler: Option, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // TODO: take max_wait as a function argument? - let x = self - .wait_for_request_handle(authorization, max_wait, error_handler) - .await? - .request::(method, params) - .await?; + let mut tries = max_tries.unwrap_or(1); - Ok(x) + let mut last_error: Option = None; + + while tries > 0 { + tries -= 1; + + let handle = match self + .wait_for_request_handle(authorization, max_wait, error_handler) + .await + { + Ok(x) => x, + Err(err) => { + last_error = Some(err); + continue; + } + }; + + match handle.request::(method, params).await { + Ok(x) => return Ok(x), + Err(err) => { + last_error = Some(err.into()); + continue; + } + } + } + + if let Some(last_error) = last_error { + return Err(last_error); + } + + Err(anyhow::anyhow!("authorized_request failed in an unexpected way").into()) } }