From e8d992168c04fa6bc7add418da9498608636e1a9 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 28 Jun 2023 18:36:17 -0700 Subject: [PATCH] more logs and retries --- web3_proxy/src/bin/web3_proxy_cli/main.rs | 6 ++ web3_proxy/src/bin/web3_proxy_cli/proxyd.rs | 21 +++++-- web3_proxy/src/block_number.rs | 8 ++- web3_proxy/src/compute_units.rs | 3 + web3_proxy/src/rpcs/consensus.rs | 4 +- web3_proxy/src/rpcs/many.rs | 68 +++++++++++++++++---- web3_proxy/src/rpcs/one.rs | 9 +-- web3_proxy/src/rpcs/request.rs | 6 +- web3_proxy/src/stats/stat_buffer.rs | 1 + 9 files changed, 98 insertions(+), 28 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index b87812b6..85b0f759 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -231,6 +231,12 @@ fn main() -> anyhow::Result<()> { ..Default::default() }); + let maybe_chain_id = top_config.as_ref().map(|x| x.app.chain_id); + + sentry::configure_scope(|scope| { + scope.set_tag("chain_id", format!("{:?}", maybe_chain_id)); + }); + tracing_subscriber::fmt() // create a subscriber that uses the RUST_LOG env var for filtering levels .with_env_filter(EnvFilter::builder().parse(rust_log)?) diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 09ed4ad2..3dfe8e3c 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -127,7 +127,21 @@ async fn run( prometheus_shutdown_receiver, )); - let _ = spawned_app.app.head_block_receiver().changed().await; + info!("waiting for head block"); + loop { + spawned_app.app.head_block_receiver().changed().await?; + + if spawned_app + .app + .head_block_receiver() + .borrow_and_update() + .is_some() + { + break; + } else { + info!("no head block yet!"); + } + } // start the frontend port let frontend_handle = tokio::spawn(frontend::serve( @@ -419,10 +433,7 @@ mod tests { let first_block_num = anvil_result.number.unwrap(); // mine a block - let _: U256 = anvil_provider - .request("evm_mine", ()) - .await - .unwrap(); + let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap(); // make sure the block advanced let anvil_result = anvil_provider diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 99f98345..f299640d 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -263,7 +263,9 @@ impl CacheMode { *x = json!(block_num); } - todo!(); + let (block_hash, _) = rpcs.block_hash(authorization, &block_num).await?; + + BlockNumAndHash(block_num, block_hash) } else { warn!("if no from, what should we default? 0 or head?"); head_block.into() @@ -282,7 +284,9 @@ impl CacheMode { *x = json!(block_num); } - todo!(); + let (block_hash, _) = rpcs.block_hash(authorization, &block_num).await?; + + BlockNumAndHash(block_num, block_hash) } else { head_block.into() }; diff --git a/web3_proxy/src/compute_units.rs b/web3_proxy/src/compute_units.rs index 5a0c67d6..1d266089 100644 --- a/web3_proxy/src/compute_units.rs +++ b/web3_proxy/src/compute_units.rs @@ -80,6 +80,9 @@ impl ComputeUnit { (_, "eth_newBlockFilter") => 20, (_, "eth_newFilter") => 20, (_, "eth_newPendingTransactionFilter") => 20, + (_, "eth_pollSubscriptions") => { + return Self::unimplemented(); + } (_, "eth_protocolVersion") => 0, (_, "eth_sendRawTransaction") => 250, (_, "eth_sendUserOperation") => 1000, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index cc09f7e5..7b6d5350 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -478,7 +478,7 @@ impl ConsensusFinder { match old_consensus_head_connections.as_ref() { None => { - debug!( + info!( "first {}/{} {}{}/{}/{} block={}, rpc={}", best_tier, worst_tier, @@ -836,7 +836,7 @@ impl ConsensusFinder { if num_known < web3_rpcs.min_synced_rpcs { // this keeps us from serving requests when the proxy first starts - info!(min_synced_rpcs=%web3_rpcs.min_synced_rpcs, "not enough servers known"); + info!(%num_known, min_synced_rpcs=%web3_rpcs.min_synced_rpcs, "not enough servers known"); return Ok(None); } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6479a1c3..f913a73c 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -735,8 +735,60 @@ impl Web3Rpcs { max_wait: Option, ) -> Web3ProxyResult { // TODO: no request_metadata means we won't have stats on this internal request. - self.request_with_metadata(method, params, None, max_tries, max_wait, None, None) - .await + self.request_with_metadata_and_retries( + method, params, None, max_tries, max_wait, None, None, + ) + .await + } + + /// Make a request with stat tracking. + #[allow(clippy::too_many_arguments)] + pub async fn request_with_metadata_and_retries( + &self, + method: &str, + params: &P, + request_metadata: Option<&Arc>, + max_tries: Option, + max_wait: Option, + min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, + ) -> Web3ProxyResult { + let mut tries = max_tries.unwrap_or(1); + + let mut last_error = None; + + while tries > 0 { + tries -= 1; + + match self + .request_with_metadata( + method, + params, + request_metadata, + max_wait, + min_block_needed, + max_block_needed, + ) + .await + { + Ok(x) => return Ok(x), + Err(Web3ProxyError::JsonRpcErrorData(err)) => { + // TODO: retry some of these? i think request_with_metadata is already smart enough though + return Err(err.into()); + } + Err(err) => { + // TODO: only log params in dev + warn!(rpc=%self, %method, ?params, ?err, "retry-able error"); + last_error = Some(err) + } + } + } + + if let Some(err) = last_error { + return Err(err); + } + + Err(anyhow::anyhow!("no response, but no error either. this is a bug").into()) } /// Make a request with stat tracking. @@ -746,7 +798,6 @@ impl Web3Rpcs { method: &str, params: &P, request_metadata: Option<&Arc>, - mut max_tries: Option, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, @@ -771,15 +822,6 @@ impl Web3Rpcs { break; } } - if let Some(max_tries) = max_tries.as_mut() { - if *max_tries == 0 { - trace!("max_tries exceeded"); - break; - } - - // prepare for the next loop - *max_tries -= 1; - } match self .wait_for_best_rpc( @@ -1177,7 +1219,7 @@ impl Web3Rpcs { match proxy_mode { ProxyMode::Debug | ProxyMode::Best => { - self.request_with_metadata( + self.request_with_metadata_and_retries( method, params, request_metadata, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 21d4fd05..6f79c00c 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -615,7 +615,7 @@ impl Web3Rpc { break; } - warn!("{} subscribe err: {:#?}", self, err) + warn!(rpc=%self, ?err, "subscribe err"); } else if self.should_disconnect() { break; } @@ -678,18 +678,18 @@ impl Web3Rpc { // subscribe to the disconnect watch. the app uses this when shutting down or when configs change if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() { - let clone = self.clone(); + let rpc = self.clone(); let mut disconnect_watch_rx = disconnect_watch_tx.subscribe(); let f = async move { loop { if *disconnect_watch_rx.borrow_and_update() { - info!("disconnect triggered on {}", clone); break; } disconnect_watch_rx.changed().await?; } + info!(%rpc, "disconnect triggered"); Ok(()) }; @@ -721,7 +721,8 @@ impl Web3Rpc { // TODO: move this into a function and the chaining should be easier if let Err(err) = rpc.healthcheck(error_handler).await { // TODO: different level depending on the error handler - warn!("health checking {} failed: {:?}", rpc, err); + // TODO: if rate limit error, set "retry_at" + warn!(%rpc, ?err, "health check failed"); } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 0cef61fa..9f5ab6c5 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -383,8 +383,10 @@ impl OpenRequestHandle { } Err(err) => { warn!( - "failed parsing eth_call params. unable to save revert. {}", - err + %method, + ?params, + ?err, + "failed parsing eth_call params. unable to save revert", ); } } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 55b19500..2c099a0c 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -164,6 +164,7 @@ impl StatBuffer { // TODO: wait on all websockets to close // TODO: wait on all pending external requests to finish + info!("waiting 10 seconds for remaining stats to arrive"); sleep(Duration::from_secs(10)).await; // loop {