From 0069e760408110169dc5652740df0b645f39d291 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Tue, 4 Apr 2023 14:40:22 +0200 Subject: [PATCH] latest changes from bryan --- web3_proxy/src/app/mod.rs | 24 +- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 15 +- web3_proxy/src/bin/web3_proxy_cli/proxyd.rs | 26 +-- web3_proxy/src/frontend/mod.rs | 21 +- web3_proxy/src/rpcs/many.rs | 39 +--- web3_proxy/src/rpcs/one.rs | 8 +- web3_proxy/src/rpcs/request.rs | 206 +++++++----------- 7 files changed, 120 insertions(+), 219 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 71fc42d8..2c93faac 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -820,18 +820,18 @@ impl Web3ProxyApp { app_handles.push(config_handle); } - // ======= - // if important_background_handles.is_empty() { - // info!("no important background handles"); - // - // let f = tokio::spawn(async move { - // let _ = background_shutdown_receiver.recv().await; - // - // Ok(()) - // }); - // - // important_background_handles.push(f); - // >>>>>>> 77df3fa (stats v2) + + if important_background_handles.is_empty() { + info!("no important background handles"); + + let f = tokio::spawn(async move { + let _ = background_shutdown_receiver.recv().await; + + Ok(()) + }); + + important_background_handles.push(f); + } Ok(( app, diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 41d880b8..4a7a4cee 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -279,14 +279,15 @@ impl MigrateStatsToV2 { ); drop(stat_sender); - // match app_shutdown_sender.send(()) { - // Err(x) => { - // panic!("Could not send shutdown signal! {:?}", x); - // } - // _ => {} - // }; - // Drop the background handle, wait for any tasks that are on-going + match app_shutdown_sender.send(()) { + Err(x) => { + panic!("Could not send shutdown signal! {:?}", x); + } + _ => {} + }; + + // Wait for any tasks that are on-going while let Some(x) = important_background_handles.next().await { info!("Returned item is: {:?}", x); match x { diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index f46835b8..312625a3 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -190,31 +190,8 @@ async fn run( prometheus_shutdown_receiver, )); - // wait until the app has seen its first consensus head block - // if backups were included, wait a little longer - // let _ = spawned_app.app.head_block_receiver().changed().await; - // if backups were included, wait a little longer - // for _ in 0..3 { - // let _ = spawned_app.consensus_connections_watcher.changed().await; - // - // let consensus = spawned_app - // .consensus_connections_watcher - // .borrow_and_update(); - // - // // Let's just do a super dirty unwrap to get things going - // if consensus.unwrap().backups_needed { - // info!( - // "waiting longer. found consensus with backups: {}", - // consensus.unwrap().head_block.as_ref().unwrap(), - // ); - // } else { - // // TODO: also check that we have at least one archive node connected? - // break; - // } - // } let _ = spawned_app.app.head_block_receiver().changed().await; - // start the frontend port let frontend_handle = tokio::spawn(frontend::serve( app_frontend_port, @@ -268,6 +245,7 @@ async fn run( } } } + // TODO: This seems to have been removed on the main branch // TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though x = spawned_app.background_handles.next() => { match x { @@ -284,6 +262,7 @@ async fn run( } }; + // TODO: This is also not there on the main branch // if a future above completed, make sure the frontend knows to start turning off if !frontend_exited { if let Err(err) = frontend_shutdown_sender.send(()) { @@ -292,6 +271,7 @@ async fn run( }; } + // TODO: Also not there on main branch // TODO: wait until the frontend completes if let Err(err) = frontend_shutdown_complete_receiver.recv().await { warn!("shutdown completition err={:?}", err); diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 06ba3ed1..549ef287 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -238,18 +238,13 @@ pub async fn serve( let server = axum::Server::bind(&addr) // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .serve(service) - // <<<<<<< HEAD - // .with_graceful_shutdown(async move { - // let _ = shutdown_receiver.recv().await; - // }) - // .await - // .map_err(Into::into); - // - // let _ = shutdown_complete_sender.send(()); - // - // server - // ======= - .await?; + .with_graceful_shutdown(async move { + let _ = shutdown_receiver.recv().await; + }) + .await + .map_err(Into::into); - Ok(()) + let _ = shutdown_complete_sender.send(()); + + server } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 0f6f5ffb..ce89dcf5 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -2,7 +2,7 @@ use super::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use super::consensus::ConsensusWeb3Rpcs; use super::one::Web3Rpc; -use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; +use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler}; use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; ///! Load balanced communication with a group of web3 providers use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; @@ -316,43 +316,8 @@ impl Web3Rpcs { } } - // <<<<<<< HEAD Ok(()) } - // ======= - // // TODO: max_capacity and time_to_idle from config - // // all block hashes are the same size, so no need for weigher - // let block_hashes = Cache::builder() - // .time_to_idle(Duration::from_secs(600)) - // .max_capacity(10_000) - // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // // all block numbers are the same size, so no need for weigher - // let block_numbers = Cache::builder() - // .time_to_idle(Duration::from_secs(600)) - // .max_capacity(10_000) - // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // - // let (watch_consensus_connections_sender, consensus_connections_watcher) = - // watch::channel(Default::default()); - // - // let watch_consensus_head_receiver = - // watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); - // - // let connections = Arc::new(Self { - // by_name: connections, - // watch_consensus_rpcs_sender: watch_consensus_connections_sender, - // watch_consensus_head_receiver, - // pending_transactions, - // block_hashes, - // block_numbers, - // min_sum_soft_limit, - // min_head_rpcs, - // max_block_age, - // max_block_lag, - // }); - // - // let authorization = Arc::new(Authorization::internal(db_conn.clone())?); - // >>>>>>> 77df3fa (stats v2) pub fn get(&self, conn_name: &str) -> Option> { self.by_name.read().get(conn_name).cloned() @@ -913,7 +878,7 @@ impl Web3Rpcs { .request( &request.method, &json!(request.params), - RequestErrorHandler::SaveRevert, + RequestRevertHandler::Save, None, ) .await; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 03cf6834..63114df3 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -5,7 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; -use crate::rpcs::request::RequestErrorHandler; +use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; @@ -716,9 +716,9 @@ impl Web3Rpc { tx_id_sender: Option)>>, ) -> anyhow::Result<()> { let error_handler = if self.backup { - RequestErrorHandler::DebugLevel + RequestRevertHandler::DebugLevel } else { - RequestErrorHandler::ErrorLevel + RequestRevertHandler::ErrorLevel }; let mut delay_start = false; @@ -1332,7 +1332,7 @@ impl Web3Rpc { self: &Arc, method: &str, params: &P, - revert_handler: RequestErrorHandler, + revert_handler: RequestRevertHandler, authorization: Arc, unlocked_provider: Option>, ) -> anyhow::Result diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 8b9b4da0..2bf24b2f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -35,7 +35,7 @@ pub struct OpenRequestHandle { /// Depending on the context, RPC errors can require different handling. #[derive(Copy, Clone)] -pub enum RequestErrorHandler { +pub enum RequestRevertHandler { /// Log at the trace level. Use when errors are expected. TraceLevel, /// Log at the debug level. Use when errors are expected. @@ -45,7 +45,7 @@ pub enum RequestErrorHandler { /// Log at the warn level. Use when errors do not cause problems. WarnLevel, /// Potentially save the revert. Users can tune how often this happens - SaveRevert, + Save, } // TODO: second param could be skipped since we don't need it here @@ -58,13 +58,13 @@ struct EthCallFirstParams { data: Option, } -impl From for RequestErrorHandler { +impl From for RequestRevertHandler { fn from(level: Level) -> Self { match level { - Level::Trace => RequestErrorHandler::TraceLevel, - Level::Debug => RequestErrorHandler::DebugLevel, - Level::Error => RequestErrorHandler::ErrorLevel, - Level::Warn => RequestErrorHandler::WarnLevel, + Level::Trace => RequestRevertHandler::TraceLevel, + Level::Debug => RequestRevertHandler::DebugLevel, + Level::Error => RequestRevertHandler::ErrorLevel, + Level::Warn => RequestRevertHandler::WarnLevel, _ => unimplemented!("unexpected tracing Level"), } } @@ -150,7 +150,7 @@ impl OpenRequestHandle { self, method: &str, params: &P, - revert_handler: RequestErrorHandler, + revert_handler: RequestRevertHandler, unlocked_provider: Option>, ) -> Result where @@ -229,17 +229,17 @@ impl OpenRequestHandle { if let Err(err) = &response { // only save reverts for some types of calls // TODO: do something special for eth_sendRawTransaction too - let error_handler = if let RequestErrorHandler::SaveRevert = revert_handler { + let error_handler = if let RequestRevertHandler::Save = revert_handler { // TODO: should all these be Trace or Debug or a mix? if !["eth_call", "eth_estimateGas"].contains(&method) { // trace!(%method, "skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } else if self.authorization.db_conn.is_some() { let log_revert_chance = self.authorization.checks.log_revert_chance; if log_revert_chance == 0.0 { // trace!(%method, "no chance. skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } else if log_revert_chance == 1.0 { // trace!(%method, "gaurenteed chance. SAVING on revert"); revert_handler @@ -247,7 +247,7 @@ impl OpenRequestHandle { < log_revert_chance { // trace!(%method, "missed chance. skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } else { // trace!("Saving on revert"); // TODO: is always logging at debug level fine? @@ -255,7 +255,7 @@ impl OpenRequestHandle { } } else { // trace!(%method, "no database. skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } } else { revert_handler @@ -263,10 +263,10 @@ impl OpenRequestHandle { // TODO: simple enum -> string derive? #[derive(Debug)] - enum ResponseErrorType { + enum ResponseTypes { Revert, RateLimit, - Error, + Ok, } // check for "execution reverted" here @@ -312,127 +312,87 @@ impl OpenRequestHandle { if let Some(msg) = msg { if msg.starts_with("execution reverted") { trace!("revert from {}", self.rpc); - ResponseErrorType::Revert + ResponseTypes::Revert } else if msg.contains("limit") || msg.contains("request") { trace!("rate limit from {}", self.rpc); - ResponseErrorType::RateLimit + ResponseTypes::RateLimit } else { - ResponseErrorType::Error + ResponseTypes::Ok } } else { - ResponseErrorType::Error + ResponseTypes::Ok } } else { - ResponseErrorType::Error + ResponseTypes::Ok }; - match response_type { - ResponseErrorType::RateLimit => { - if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() { - // TODO: how long? different providers have different rate limiting periods, though most seem to be 1 second - // TODO: until the next second, or wait 1 whole second? - let retry_at = Instant::now() + Duration::from_secs(1); + if matches!(response_type, ResponseTypes::RateLimit) { + if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() { + let retry_at = Instant::now() + Duration::from_secs(1); - trace!("retry {} at: {:?}", self.rpc, retry_at); + trace!("retry {} at: {:?}", self.rpc, retry_at); - hard_limit_until.send_replace(retry_at); - } - } - ResponseErrorType::Error => { - // TODO: should we just have Error or RateLimit? do we need Error and Revert separate? - - match error_handler { - RequestErrorHandler::DebugLevel => { - // TODO: include params only if not running in release mode - debug!( - "error response from {}! method={} params={:?} err={:?}", - self.rpc, method, params, err - ); - } - RequestErrorHandler::TraceLevel => { - trace!( - "error response from {}! method={} params={:?} err={:?}", - self.rpc, - method, - params, - err - ); - } - RequestErrorHandler::ErrorLevel => { - // TODO: include params only if not running in release mode - error!( - "error response from {}! method={} err={:?}", - self.rpc, method, err - ); - } - RequestErrorHandler::SaveRevert | RequestErrorHandler::WarnLevel => { - // TODO: include params only if not running in release mode - warn!( - "error response from {}! method={} err={:?}", - self.rpc, method, err - ); - } - } - } - ResponseErrorType::Revert => { - match error_handler { - RequestErrorHandler::DebugLevel => { - // TODO: include params only if not running in release mode - debug!( - "revert response from {}! method={} params={:?} err={:?}", - self.rpc, method, params, err - ); - } - RequestErrorHandler::TraceLevel => { - trace!( - "revert response from {}! method={} params={:?} err={:?}", - self.rpc, - method, - params, - err - ); - } - RequestErrorHandler::ErrorLevel => { - // TODO: include params only if not running in release mode - error!( - "revert response from {}! method={} err={:?}", - self.rpc, method, err - ); - } - RequestErrorHandler::WarnLevel => { - // TODO: include params only if not running in release mode - warn!( - "revert response from {}! method={} err={:?}", - self.rpc, method, err - ); - } - RequestErrorHandler::SaveRevert => { - trace!( - "revert response from {}! method={} params={:?} err={:?}", - self.rpc, - method, - params, - err - ); - - // TODO: do not unwrap! (doesn't matter much since we check method as a string above) - let method: Method = - Method::try_from_value(&method.to_string()).unwrap(); - - // TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here - let params: EthCallParams = serde_json::from_value(json!(params)) - .context("parsing params to EthCallParams") - .unwrap(); - - // spawn saving to the database so we don't slow down the request - let f = self.authorization.clone().save_revert(method, params.0 .0); - - tokio::spawn(f); - } - } + hard_limit_until.send_replace(retry_at); + } + } + + // TODO: think more about the method and param logs. those can be sensitive information + match revert_handler { + RequestRevertHandler::DebugLevel => { + // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag + if matches!(response_type, ResponseTypes::Revert) { + debug!( + "bad response from {}! method={} params={:?} err={:?}", + self.rpc, method, params, err + ); + } + } + RequestRevertHandler::TraceLevel => { + trace!( + "bad response from {}! method={} params={:?} err={:?}", + self.rpc, + method, + params, + err + ); + } + RequestRevertHandler::ErrorLevel => { + // TODO: include params if not running in release mode + error!( + "bad response from {}! method={} err={:?}", + self.rpc, method, err + ); + } + RequestRevertHandler::WarnLevel => { + // TODO: include params if not running in release mode + warn!( + "bad response from {}! method={} err={:?}", + self.rpc, method, err + ); + } + RequestRevertHandler::Save => { + trace!( + "bad response from {}! method={} params={:?} err={:?}", + self.rpc, + method, + params, + err + ); + + // TODO: do not unwrap! (doesn't matter much since we check method as a string above) + let method: Method = Method::try_from_value(&method.to_string()).unwrap(); + + // TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here + let params: EthCallParams = serde_json::from_value(json!(params)) + .context("parsing params to EthCallParams") + .unwrap(); + + // spawn saving to the database so we don't slow down the request + let f = self.authorization.clone().save_revert(method, params.0 .0); + + tokio::spawn(f); } } - // TODO: track error latency? } else { // TODO: record request latency // let latency_ms = start.elapsed().as_secs_f64() * 1000.0;