diff --git a/Cargo.lock b/Cargo.lock index 71489963..046e086a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2973,18 +2973,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "libz-sys" -version = "1.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "linux-raw-sys" version = "0.4.10" @@ -4164,7 +4152,6 @@ checksum = "ad63c279fca41a27c231c450a2d2ad18288032e9cbb159ad16c9d96eba35aaaf" dependencies = [ "cmake", "libc", - "libz-sys", "num_enum 0.5.11", "openssl-sys", "pkg-config", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 498ca460..49017448 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" default = [] mimalloc = ["dep:mimalloc"] -rdkafka-src = ["rdkafka/cmake-build", "rdkafka/ssl-vendored"] +rdkafka-src = ["dep:rdkafka", "rdkafka/cmake-build", "rdkafka/ssl-vendored"] stripe = ["dep:async-stripe"] tests-needing-docker = [] @@ -73,7 +73,7 @@ once_cell = { version = "1.18.0" } ordered-float = {version = "4.1.1" } pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] } parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] } -rdkafka = { version = "0.34.0", features = ["tracing"] } +rdkafka = { version = "0.34.0", default-features = false, features = ["tokio", "tracing"], optional = true } reqwest = { version = "0.11.22", default-features = false, features = ["json", "rustls"] } rust_decimal = { version = "1.32.0" } sentry = { version = "0.31.7", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 15f761c6..77d4981b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -103,6 +103,7 @@ pub struct App { /// give some bonus capacity to public users pub bonus_ip_concurrency: Arc, /// the /debug/ rpc endpoints send detailed logging to kafka + #[cfg(feature = "rdkafka")] pub kafka_producer: Option, /// rate limit the login endpoint /// we do this because each pending login is a row in the database @@ -194,7 +195,10 @@ impl App { // connect to kafka for logging requests from the /debug/ urls + #[cfg(feature = "rdkafka")] let mut kafka_producer: Option = None; + + #[cfg(feature = "rdkafka")] if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() { info!("Connecting to kafka"); @@ -505,6 +509,7 @@ impl App { jsonrpc_response_cache, jsonrpc_response_failed_cache_keys, jsonrpc_response_semaphores, + #[cfg(feature = "rdkafka")] kafka_producer, login_rate_limiter, pending_txid_firehose: deduped_txid_firehose, diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index d95d8f0a..9f46b964 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -1,3 +1,4 @@ +use super::{JsonRpcParams, LooseId, SingleRequest}; use crate::{ app::App, block_number::CacheMode, @@ -7,8 +8,6 @@ use crate::{ rpc_proxy_ws::ProxyMode, }, globals::APP, - jsonrpc, - kafka::KafkaDebugLogger, response_cache::JsonRpcQueryCacheKey, rpcs::{blockchain::Web3ProxyBlock, one::Web3Rpc}, secrets::RpcSecretKey, @@ -36,10 +35,14 @@ use tokio::{ sync::{mpsc, OwnedSemaphorePermit}, time::Instant, }; -use tracing::{error, trace, warn}; -use ulid::Ulid; +use tracing::{error, trace}; -use super::{JsonRpcParams, LooseId, SingleRequest}; +#[cfg(feature = "rdkafka")] +use { + crate::{jsonrpc, kafka::KafkaDebugLogger}, + tracing::warn, + ulid::Ulid, +}; #[derive(Derivative)] #[derivative(Default)] @@ -235,10 +238,14 @@ pub struct ValidatedRequest { /// If the request is invalid or received a jsonrpc error response (excluding reverts) pub user_error_response: AtomicBool, + #[cfg(feature = "rdkafka")] /// ProxyMode::Debug logs requests and responses with Kafka /// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this pub kafka_debug_logger: Option>, + #[cfg(not(feature = "rdkafka"))] + pub kafka_debug_logger: Option<()>, + /// Cancel-safe channel for sending stats to the buffer pub stat_sender: Option>, @@ -310,7 +317,7 @@ impl ValidatedRequest { authorization: Arc, chain_id: u64, mut head_block: Option, - kafka_debug_logger: Option>, + #[cfg(feature = "rdkafka")] kafka_debug_logger: Option>, max_wait: Option, permit: Option, mut request: RequestOrMethod, @@ -325,11 +332,14 @@ impl ValidatedRequest { // we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key // this is because calculating the cache_key may modify the params! // for example, if the request specifies "latest" as the block number, we replace it with the actual latest block number + #[cfg(feature = "rdkafka")] if let Some(ref kafka_debug_logger) = kafka_debug_logger { // TODO: channels might be more ergonomic than spawned futures // spawned things run in parallel easier but generally need more Arcs kafka_debug_logger.log_debug_request(&request); } + #[cfg(not(feature = "rdkafka"))] + let kafka_debug_logger = None; if head_block.is_none() { if let Some(app) = app { @@ -384,10 +394,11 @@ impl ValidatedRequest { request: RequestOrMethod, head_block: Option, ) -> Web3ProxyResult> { - // TODO: get this out of tracing instead (where we have a String from Amazon's LB) - let request_ulid = Ulid::new(); - + #[cfg(feature = "rdkafka")] let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) { + // TODO: get this out of tracing instead (where we have a String from Amazon's LB) + let request_ulid = Ulid::new(); + KafkaDebugLogger::try_new( app, authorization.clone(), @@ -408,6 +419,7 @@ impl ValidatedRequest { authorization, chain_id, head_block, + #[cfg(feature = "rdkafka")] kafka_debug_logger, max_wait, permit, @@ -447,6 +459,7 @@ impl ValidatedRequest { authorization, 0, head_block, + #[cfg(feature = "rdkafka")] None, max_wait, None, @@ -557,6 +570,7 @@ impl ValidatedRequest { // TODO: set user_error_response and error_response here instead of outside this function + #[cfg(feature = "rdkafka")] if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() { if let ResponseOrBytes::Response(response) = response { match response { diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 9696918e..878bd7a6 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -16,7 +16,6 @@ pub mod frontend; pub mod globals; pub mod http_params; pub mod jsonrpc; -pub mod kafka; pub mod pagerduty; pub mod prelude; pub mod premium; @@ -29,3 +28,6 @@ pub mod secrets; pub mod stats; pub mod test_utils; pub mod user_token; + +#[cfg(feature = "rdkafka")] +pub mod kafka; diff --git a/web3_proxy/src/prelude.rs b/web3_proxy/src/prelude.rs index 5579da6c..cca1563c 100644 --- a/web3_proxy/src/prelude.rs +++ b/web3_proxy/src/prelude.rs @@ -17,7 +17,6 @@ pub use num; pub use ordered_float; pub use pagerduty_rs; pub use parking_lot; -pub use rdkafka; pub use reqwest; pub use rust_decimal; pub use sentry; @@ -31,3 +30,6 @@ pub use tracing; pub use ulid; pub use url; pub use uuid; + +#[cfg(feature = "rdkafka")] +pub use rdkafka; diff --git a/web3_proxy_cli/src/main.rs b/web3_proxy_cli/src/main.rs index 6f7d06f9..7f491615 100644 --- a/web3_proxy_cli/src/main.rs +++ b/web3_proxy_cli/src/main.rs @@ -78,6 +78,7 @@ enum SubCommand { PopularityContest(sub_commands::PopularityContestSubCommand), Proxyd(sub_commands::ProxydSubCommand), RpcAccounting(sub_commands::RpcAccountingSubCommand), + #[cfg(feature = "rdkafka")] SearchKafka(sub_commands::SearchKafkaSubCommand), Sentryd(sub_commands::SentrydSubCommand), TransferKey(sub_commands::TransferKeySubCommand), @@ -285,7 +286,11 @@ fn main() -> anyhow::Result<()> { // set up tokio's async runtime let mut rt_builder = runtime::Builder::new_multi_thread(); - rt_builder.enable_all(); + rt_builder.enable_io(); + rt_builder.enable_time(); + + // TODO: debug option to enable and expose this + // rt_builder.enable_metrics_poll_count_histogram(); if cli_config.workers > 0 { rt_builder.worker_threads(cli_config.workers); @@ -449,6 +454,7 @@ fn main() -> anyhow::Result<()> { x.main(pagerduty_async, top_config).await } SubCommand::PopularityContest(x) => x.main().await, + #[cfg(feature = "rdkafka")] SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await, SubCommand::Sentryd(x) => { if cli_config.sentry_url.is_none() { diff --git a/web3_proxy_cli/src/sub_commands/mod.rs b/web3_proxy_cli/src/sub_commands/mod.rs index f4fd242a..66e8445e 100644 --- a/web3_proxy_cli/src/sub_commands/mod.rs +++ b/web3_proxy_cli/src/sub_commands/mod.rs @@ -16,12 +16,14 @@ mod pagerduty; mod popularity_contest; mod proxyd; mod rpc_accounting; -mod search_kafka; mod sentryd; mod transfer_key; mod user_export; mod user_import; +#[cfg(feature = "rdkafka")] +mod search_kafka; + pub use self::change_admin_status::ChangeAdminStatusSubCommand; pub use self::change_user_address::ChangeUserAddressSubCommand; pub use self::change_user_tier::ChangeUserTierSubCommand; @@ -40,8 +42,10 @@ pub use self::pagerduty::PagerdutySubCommand; pub use self::popularity_contest::PopularityContestSubCommand; pub use self::proxyd::ProxydSubCommand; pub use self::rpc_accounting::RpcAccountingSubCommand; -pub use self::search_kafka::SearchKafkaSubCommand; pub use self::sentryd::SentrydSubCommand; pub use self::transfer_key::TransferKeySubCommand; pub use self::user_export::UserExportSubCommand; pub use self::user_import::UserImportSubCommand; + +#[cfg(feature = "rdkafka")] +pub use self::search_kafka::SearchKafkaSubCommand;