From e0d7d11398163bb8554cd6736d1dbc7f55caaa24 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 2 Aug 2023 22:07:28 -0700 Subject: [PATCH] remove max_tries for now i think this is causing problematic looping --- web3_proxy/src/app/mod.rs | 11 ++--- web3_proxy/src/block_number.rs | 8 ++-- web3_proxy/src/rpcs/blockchain.rs | 18 +++----- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/many.rs | 73 ++++++------------------------- web3_proxy/src/rpcs/one.rs | 56 +++--------------------- 6 files changed, 34 insertions(+), 134 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5f64eb19..0ad6cfb2 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1169,7 +1169,6 @@ impl Web3ProxyApp { &request.method, &mut request.params, head_block, - Some(2), &request_metadata, ) .await @@ -1224,7 +1223,6 @@ impl Web3ProxyApp { method: &str, params: &mut serde_json::Value, head_block: Option<&Web3ProxyBlock>, - max_tries: Option, request_metadata: &Arc, ) -> Web3ProxyResult>> { // TODO: don't clone into a new string? @@ -1338,7 +1336,6 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - max_tries, Some(Duration::from_secs(30)), None, None, @@ -1378,7 +1375,6 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - max_tries, Some(Duration::from_secs(30)), None, None, @@ -1412,7 +1408,6 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - max_tries, Some(Duration::from_secs(30)), None, None, @@ -1437,7 +1432,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - max_tries, + Some(Duration::from_secs(30)), // TODO: should this be block 0 instead? Some(&U64::one()), @@ -1739,7 +1734,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - max_tries, + Some(backend_request_timetout), from_block_num.as_ref(), to_block_num.as_ref(), @@ -1775,7 +1770,7 @@ impl Web3ProxyApp { method, params, Some(request_metadata), - max_tries, + Some(backend_request_timetout), None, None, diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index aba4f11d..27a00aac 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -98,7 +98,7 @@ pub async fn clean_block_number( serde_json::from_value(block_hash).context("decoding blockHash")?; let block = rpcs - .block(&block_hash, None, Some(3), None) + .block(&block_hash, None, None) .await .context("fetching block number from hash")?; @@ -117,7 +117,7 @@ pub async fn clean_block_number( .context("fetching block hash from number")?; let block = rpcs - .block(&block_hash, None, Some(3), None) + .block(&block_hash, None, None) .await .context("fetching block from hash")?; @@ -138,7 +138,7 @@ pub async fn clean_block_number( .context("fetching block hash from number")?; let block = rpcs - .block(&block_hash, None, Some(3), None) + .block(&block_hash, None, None) .await .context("fetching block from hash")?; @@ -146,7 +146,7 @@ pub async fn clean_block_number( } } else if let Ok(block_hash) = serde_json::from_value::(x.clone()) { let block = rpcs - .block(&block_hash, None, Some(3), None) + .block(&block_hash, None, None) .await .context("fetching block number from hash")?; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 377af889..009fd410 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -267,7 +267,6 @@ impl Web3Rpcs { &self, hash: &H256, rpc: Option<&Arc>, - max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // first, try to get the hash from our cache @@ -285,7 +284,7 @@ impl Web3Rpcs { } // hashes don't match! this block must be in the middle of being uncled - // TODO: check known uncles + // TODO: check known uncles. clear uncle caches } if hash == &H256::zero() { @@ -303,10 +302,11 @@ impl Web3Rpcs { "eth_getBlockByHash", &get_block_params, None, - max_tries, max_wait, ) - .await? + .await + .ok() + .flatten() } else { None }; @@ -319,7 +319,6 @@ impl Web3Rpcs { .internal_request::<_, Option>( "eth_getBlockByHash", &get_block_params, - max_tries, max_wait, ) .await?; @@ -391,7 +390,7 @@ impl Web3Rpcs { if let Some(block_hash) = self.blocks_by_number.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: configurable max wait and rpc - let block = self.block(&block_hash, None, Some(3), None).await?; + let block = self.block(&block_hash, None, None).await?; return Ok((block, block_depth)); } @@ -399,12 +398,7 @@ impl Web3Rpcs { // block number not in cache. we need to ask an rpc for it // TODO: this error is too broad let response = self - .internal_request::<_, Option>( - "eth_getBlockByNumber", - &(*num, false), - Some(3), - None, - ) + .internal_request::<_, Option>("eth_getBlockByNumber", &(*num, false), None) .await? .ok_or(Web3ProxyError::NoBlocksKnown)?; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 76bf16ed..6a4a76ea 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -860,7 +860,7 @@ impl ConsensusFinder { let parent_hash = block_to_check.parent_hash(); - match web3_rpcs.block(parent_hash, Some(rpc), Some(1), None).await { + match web3_rpcs.block(parent_hash, Some(rpc), None).await { Ok(parent_block) => block_to_check = parent_block, Err(err) => { debug!( diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index bc9441d9..a1d33cb3 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -32,7 +32,7 @@ use std::sync::Arc; use tokio::select; use tokio::sync::{mpsc, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] @@ -499,6 +499,7 @@ impl Web3Rpcs { } } + #[instrument(level = "trace")] pub async fn wait_for_best_rpc( &self, request_metadata: Option<&Arc>, @@ -521,7 +522,9 @@ impl Web3Rpcs { let mut potential_rpcs = Vec::new(); - loop { + // TODO: max loop? + for attempt in 0..32 { + trace!(attempt); // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start let ranked_rpcs: Option> = watch_ranked_rpcs.borrow_and_update().clone(); @@ -535,6 +538,7 @@ impl Web3Rpcs { .all() .iter() .filter(|rpc| { + // TODO: instrument this? ranked_rpcs.rpc_will_work_now( skip_rpcs, min_block_needed, @@ -585,19 +589,23 @@ impl Web3Rpcs { ShouldWaitForBlock::Wait { .. } => select! { _ = watch_ranked_rpcs.changed() => { // no need to borrow_and_update because we do that at the top of the loop + trace!("watch ranked rpcs changed"); }, _ = sleep_until(start + max_wait) => break, }, } } } else if let Some(max_wait) = max_wait { + trace!(max_wait = max_wait.as_secs_f32(), "no potential rpcs"); select! { _ = watch_ranked_rpcs.changed() => { // no need to borrow_and_update because we do that at the top of the loop + trace!("watch ranked rpcs changed"); }, _ = sleep_until(start + max_wait) => break, } } else { + trace!("no potential rpcs and set to not wait"); break; } @@ -723,64 +731,11 @@ impl Web3Rpcs { &self, method: &str, params: &P, - max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { // TODO: no request_metadata means we won't have stats on this internal request. - self.request_with_metadata_and_retries( - method, params, None, max_tries, max_wait, None, None, - ) - .await - } - - /// Make a request with stat tracking. - #[allow(clippy::too_many_arguments)] - pub async fn request_with_metadata_and_retries( - &self, - method: &str, - params: &P, - request_metadata: Option<&Arc>, - max_tries: Option, - max_wait: Option, - min_block_needed: Option<&U64>, - max_block_needed: Option<&U64>, - ) -> Web3ProxyResult { - let mut tries = max_tries.unwrap_or(1); - - let mut last_error = None; - - while tries > 0 { - tries -= 1; - - match self - .request_with_metadata( - method, - params, - request_metadata, - max_wait, - min_block_needed, - max_block_needed, - ) - .await - { - Ok(x) => return Ok(x), - Err(Web3ProxyError::JsonRpcErrorData(err)) => { - // TODO: retry some of these? i think request_with_metadata is already smart enough though - return Err(err.into()); - } - Err(err) => { - // TODO: only log params in dev - warn!(rpc=%self, %method, ?params, ?err, %tries, "retry-able error"); - last_error = Some(err) - } - } - } - - if let Some(err) = last_error { - return Err(err); - } - - Err(anyhow::anyhow!("no response, but no error either. this is a bug").into()) + self.request_with_metadata(method, params, None, max_wait, None, None) + .await } /// Make a request with stat tracking. @@ -1259,7 +1214,6 @@ impl Web3Rpcs { method: &str, params: &P, request_metadata: Option<&Arc>, - max_tries: Option, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, @@ -1268,11 +1222,10 @@ impl Web3Rpcs { match proxy_mode { ProxyMode::Debug | ProxyMode::Best => { - self.request_with_metadata_and_retries( + self.request_with_metadata( method, params, request_metadata, - max_tries, max_wait, min_block_needed, max_block_needed, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 5ab1de7b..2622ec94 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -303,7 +303,6 @@ impl Web3Rpc { &[(); 0], // error here are expected, so keep the level low Some(Level::DEBUG.into()), - Some(2), Some(Duration::from_secs(5)), ) .await @@ -328,7 +327,6 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Some(Level::TRACE.into()), - Some(2), Some(Duration::from_secs(5)), ) .await; @@ -419,7 +417,6 @@ impl Web3Rpc { "eth_chainId", &[(); 0], Some(Level::TRACE.into()), - Some(2), Some(Duration::from_secs(5)), ) .await?; @@ -546,7 +543,6 @@ impl Web3Rpc { "eth_getTransactionByHash", &(txid,), error_handler, - Some(2), Some(Duration::from_secs(5)), ) .await? @@ -569,7 +565,6 @@ impl Web3Rpc { "eth_getCode", &(to, block_number), error_handler, - Some(2), Some(Duration::from_secs(5)), ) .await?; @@ -803,7 +798,6 @@ impl Web3Rpc { "eth_getBlockByNumber", &("latest", false), error_handler, - Some(2), Some(Duration::from_secs(5)), ) .await; @@ -839,7 +833,6 @@ impl Web3Rpc { "eth_getBlockByNumber", &("latest", false), error_handler, - Some(2), Some(Duration::from_secs(5)), ) .await; @@ -979,20 +972,12 @@ impl Web3Rpc { method: &str, params: &P, error_handler: Option, - max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { let authorization = Default::default(); - self.authorized_request( - method, - params, - &authorization, - error_handler, - max_tries, - max_wait, - ) - .await + self.authorized_request(method, params, &authorization, error_handler, max_wait) + .await } pub async fn authorized_request( @@ -1001,42 +986,15 @@ impl Web3Rpc { params: &P, authorization: &Arc, error_handler: Option, - max_tries: Option, max_wait: Option, ) -> Web3ProxyResult { - // TODO: take max_wait as a function argument? - let mut tries = max_tries.unwrap_or(1); + let handle = self + .wait_for_request_handle(authorization, max_wait, error_handler) + .await?; - let mut last_error: Option = None; + let x = handle.request::(method, params).await?; - while tries > 0 { - tries -= 1; - - let handle = match self - .wait_for_request_handle(authorization, max_wait, error_handler) - .await - { - Ok(x) => x, - Err(err) => { - last_error = Some(err); - continue; - } - }; - - match handle.request::(method, params).await { - Ok(x) => return Ok(x), - Err(err) => { - last_error = Some(err.into()); - continue; - } - } - } - - if let Some(last_error) = last_error { - return Err(last_error); - } - - Err(anyhow::anyhow!("authorized_request failed in an unexpected way").into()) + Ok(x) } }