From 5fbcb75157cd012b723c9d2b37bb7947e5e05aa5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 3 Aug 2023 00:22:45 -0700 Subject: [PATCH] Revert "remove max_tries for now" This reverts commit e0d7d11398163bb8554cd6736d1dbc7f55caaa24. --- web3_proxy/src/app/mod.rs | 11 +++-- web3_proxy/src/block_number.rs | 8 ++-- web3_proxy/src/rpcs/blockchain.rs | 18 +++++--- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/many.rs | 73 +++++++++++++++++++++++++------ web3_proxy/src/rpcs/one.rs | 56 +++++++++++++++++++++--- 6 files changed, 134 insertions(+), 34 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 0ad6cfb2..5f64eb19 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1169,6 +1169,7 @@ impl Web3ProxyApp { &request.method, &mut request.params, head_block, + Some(2), &request_metadata, ) .await @@ -1223,6 +1224,7 @@ impl Web3ProxyApp { method: &str, params: &mut serde_json::Value, head_block: Option<&Web3ProxyBlock>, + max_tries: Option, request_metadata: &Arc, ) -> Web3ProxyResult>> { // TODO: don't clone into a new string? @@ -1336,6 +1338,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), None, None, @@ -1375,6 +1378,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), None, None, @@ -1408,6 +1412,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), + max_tries, Some(Duration::from_secs(30)), None, None, @@ -1432,7 +1437,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - + max_tries, Some(Duration::from_secs(30)), // TODO: should this be block 0 instead? Some(&U64::one()), @@ -1734,7 +1739,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - + max_tries, Some(backend_request_timetout), from_block_num.as_ref(), to_block_num.as_ref(), @@ -1770,7 +1775,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - + max_tries, Some(backend_request_timetout), None, None, diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 27a00aac..aba4f11d 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -98,7 +98,7 @@ pub async fn clean_block_number( serde_json::from_value(block_hash).context("decoding blockHash")?; let block = rpcs - .block(&block_hash, None, None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block number from hash")?; @@ -117,7 +117,7 @@ pub async fn clean_block_number( .context("fetching block hash from number")?; let block = rpcs - .block(&block_hash, None, None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block from hash")?; @@ -138,7 +138,7 @@ pub async fn clean_block_number( .context("fetching block hash from number")?; let block = rpcs - .block(&block_hash, None, None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block from hash")?; @@ -146,7 +146,7 @@ pub async fn clean_block_number( } } else if let Ok(block_hash) = serde_json::from_value::(x.clone()) { let block = rpcs - .block(&block_hash, None, None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block number from hash")?; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 009fd410..377af889 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -267,6 +267,7 @@ impl Web3Rpcs { &self, hash: &H256, rpc: Option<&Arc>, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // first, try to get the hash from our cache @@ -284,7 +285,7 @@ impl Web3Rpcs { } // hashes don't match! this block must be in the middle of being uncled - // TODO: check known uncles. clear uncle caches + // TODO: check known uncles } if hash == &H256::zero() { @@ -302,11 +303,10 @@ impl Web3Rpcs { "eth_getBlockByHash", &get_block_params, None, + max_tries, max_wait, ) - .await - .ok() - .flatten() + .await? } else { None }; @@ -319,6 +319,7 @@ impl Web3Rpcs { .internal_request::<_, Option>( "eth_getBlockByHash", &get_block_params, + max_tries, max_wait, ) .await?; @@ -390,7 +391,7 @@ impl Web3Rpcs { if let Some(block_hash) = self.blocks_by_number.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: configurable max wait and rpc - let block = self.block(&block_hash, None, None).await?; + let block = self.block(&block_hash, None, Some(3), None).await?; return Ok((block, block_depth)); } @@ -398,7 +399,12 @@ impl Web3Rpcs { // block number not in cache. we need to ask an rpc for it // TODO: this error is too broad let response = self - .internal_request::<_, Option>("eth_getBlockByNumber", &(*num, false), None) + .internal_request::<_, Option>( + "eth_getBlockByNumber", + &(*num, false), + Some(3), + None, + ) .await? .ok_or(Web3ProxyError::NoBlocksKnown)?; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 6a4a76ea..76bf16ed 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -860,7 +860,7 @@ impl ConsensusFinder { let parent_hash = block_to_check.parent_hash(); - match web3_rpcs.block(parent_hash, Some(rpc), None).await { + match web3_rpcs.block(parent_hash, Some(rpc), Some(1), None).await { Ok(parent_block) => block_to_check = parent_block, Err(err) => { debug!( diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a1d33cb3..bc9441d9 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -32,7 +32,7 @@ use std::sync::Arc; use tokio::select; use tokio::sync::{mpsc, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{debug, error, info, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] @@ -499,7 +499,6 @@ impl Web3Rpcs { } } - #[instrument(level = "trace")] pub async fn wait_for_best_rpc( &self, request_metadata: Option<&Arc>, @@ -522,9 +521,7 @@ impl Web3Rpcs { let mut potential_rpcs = Vec::new(); - // TODO: max loop? - for attempt in 0..32 { - trace!(attempt); + loop { // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start let ranked_rpcs: Option> = watch_ranked_rpcs.borrow_and_update().clone(); @@ -538,7 +535,6 @@ impl Web3Rpcs { .all() .iter() .filter(|rpc| { - // TODO: instrument this? ranked_rpcs.rpc_will_work_now( skip_rpcs, min_block_needed, @@ -589,23 +585,19 @@ impl Web3Rpcs { ShouldWaitForBlock::Wait { .. } => select! { _ = watch_ranked_rpcs.changed() => { // no need to borrow_and_update because we do that at the top of the loop - trace!("watch ranked rpcs changed"); }, _ = sleep_until(start + max_wait) => break, }, } } } else if let Some(max_wait) = max_wait { - trace!(max_wait = max_wait.as_secs_f32(), "no potential rpcs"); select! { _ = watch_ranked_rpcs.changed() => { // no need to borrow_and_update because we do that at the top of the loop - trace!("watch ranked rpcs changed"); }, _ = sleep_until(start + max_wait) => break, } } else { - trace!("no potential rpcs and set to not wait"); break; } @@ -731,11 +723,64 @@ impl Web3Rpcs { &self, method: &str, params: &P, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // TODO: no request_metadata means we won't have stats on this internal request. - self.request_with_metadata(method, params, None, max_wait, None, None) - .await + self.request_with_metadata_and_retries( + method, params, None, max_tries, max_wait, None, None, + ) + .await + } + + /// Make a request with stat tracking. + #[allow(clippy::too_many_arguments)] + pub async fn request_with_metadata_and_retries( + &self, + method: &str, + params: &P, + request_metadata: Option<&Arc>, + max_tries: Option, + max_wait: Option, + min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, + ) -> Web3ProxyResult { + let mut tries = max_tries.unwrap_or(1); + + let mut last_error = None; + + while tries > 0 { + tries -= 1; + + match self + .request_with_metadata( + method, + params, + request_metadata, + max_wait, + min_block_needed, + max_block_needed, + ) + .await + { + Ok(x) => return Ok(x), + Err(Web3ProxyError::JsonRpcErrorData(err)) => { + // TODO: retry some of these? i think request_with_metadata is already smart enough though + return Err(err.into()); + } + Err(err) => { + // TODO: only log params in dev + warn!(rpc=%self, %method, ?params, ?err, %tries, "retry-able error"); + last_error = Some(err) + } + } + } + + if let Some(err) = last_error { + return Err(err); + } + + Err(anyhow::anyhow!("no response, but no error either. this is a bug").into()) } /// Make a request with stat tracking. @@ -1214,6 +1259,7 @@ impl Web3Rpcs { method: &str, params: &P, request_metadata: Option<&Arc>, + max_tries: Option, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, @@ -1222,10 +1268,11 @@ impl Web3Rpcs { match proxy_mode { ProxyMode::Debug | ProxyMode::Best => { - self.request_with_metadata( + self.request_with_metadata_and_retries( method, params, request_metadata, + max_tries, max_wait, min_block_needed, max_block_needed, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 2622ec94..5ab1de7b 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -303,6 +303,7 @@ impl Web3Rpc { &[(); 0], // error here are expected, so keep the level low Some(Level::DEBUG.into()), + Some(2), Some(Duration::from_secs(5)), ) .await @@ -327,6 +328,7 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Some(Level::TRACE.into()), + Some(2), Some(Duration::from_secs(5)), ) .await; @@ -417,6 +419,7 @@ impl Web3Rpc { "eth_chainId", &[(); 0], Some(Level::TRACE.into()), + Some(2), Some(Duration::from_secs(5)), ) .await?; @@ -543,6 +546,7 @@ impl Web3Rpc { "eth_getTransactionByHash", &(txid,), error_handler, + Some(2), Some(Duration::from_secs(5)), ) .await? @@ -565,6 +569,7 @@ impl Web3Rpc { "eth_getCode", &(to, block_number), error_handler, + Some(2), Some(Duration::from_secs(5)), ) .await?; @@ -798,6 +803,7 @@ impl Web3Rpc { "eth_getBlockByNumber", &("latest", false), error_handler, + Some(2), Some(Duration::from_secs(5)), ) .await; @@ -833,6 +839,7 @@ impl Web3Rpc { "eth_getBlockByNumber", &("latest", false), error_handler, + Some(2), Some(Duration::from_secs(5)), ) .await; @@ -972,12 +979,20 @@ impl Web3Rpc { method: &str, params: &P, error_handler: Option, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { let authorization = Default::default(); - self.authorized_request(method, params, &authorization, error_handler, max_wait) - .await + self.authorized_request( + method, + params, + &authorization, + error_handler, + max_tries, + max_wait, + ) + .await } pub async fn authorized_request( @@ -986,15 +1001,42 @@ impl Web3Rpc { params: &P, authorization: &Arc, error_handler: Option, + max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { - let handle = self - .wait_for_request_handle(authorization, max_wait, error_handler) - .await?; + // TODO: take max_wait as a function argument? + let mut tries = max_tries.unwrap_or(1); - let x = handle.request::(method, params).await?; + let mut last_error: Option = None; - Ok(x) + while tries > 0 { + tries -= 1; + + let handle = match self + .wait_for_request_handle(authorization, max_wait, error_handler) + .await + { + Ok(x) => x, + Err(err) => { + last_error = Some(err); + continue; + } + }; + + match handle.request::(method, params).await { + Ok(x) => return Ok(x), + Err(err) => { + last_error = Some(err.into()); + continue; + } + } + } + + if let Some(last_error) = last_error { + return Err(last_error); + } + + Err(anyhow::anyhow!("authorized_request failed in an unexpected way").into()) } }