diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 46fef651..03f15b35 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -56,7 +56,7 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" -rdkafka = { version = "0.29.0", optional = true } +rdkafka = { version = "0.29.0" } regex = "1.7.1" reqwest = { version = "0.11.14", default-features = false, features = ["json", "tokio-rustls"] } rmp-serde = "1.1.1" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 0e08ad32..9e88eff2 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -37,9 +37,7 @@ use migration::sea_orm::{ use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; -#[cfg(rdkafka)] use rdkafka::message::{Header, OwnedHeaders}; -#[cfg(rdkafka)] use rdkafka::producer::FutureRecord; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; @@ -230,11 +228,7 @@ pub struct Web3ProxyApp { pub bearer_token_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub stat_sender: Option>, - - #[cfg(rdkafka)] pub kafka_producer: Option, - #[cfg(not(rdkafka))] - pub kafka_producer: Option<()>, } /// flatten a JoinError into an anyhow error @@ -469,12 +463,7 @@ impl Web3ProxyApp { // connect to kafka for logging requests from the /debug/ urls - #[cfg(rdkafka)] let mut kafka_producer: Option = None; - #[cfg(not(rdkafka))] - let kafka_producer: Option<()> = None; - - #[cfg(rdkafka)] if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() { match rdkafka::ClientConfig::new() .set("bootstrap.servers", kafka_brokers) @@ -1095,10 +1084,7 @@ impl Web3ProxyApp { let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?); - #[cfg(rdkafka)] let mut kafka_stuff = None; - - #[cfg(rdkafka)] if let Some(kafka_producer) = self.kafka_producer.clone() { let request_bytes = rmp_serde::to_vec(&request)?; @@ -1114,8 +1100,10 @@ impl Web3ProxyApp { let request_hash = Some(keccak256(&request_bytes)); - // the third item is added with the response - let kafka_headers = OwnedHeaders::new_with_capacity(3) + let chain_id = self.config.chain_id; + + // another item is added with the response + let kafka_headers = OwnedHeaders::new_with_capacity(4) .insert(Header { key: "request_hash", value: request_hash.as_ref(), @@ -1123,6 +1111,10 @@ impl Web3ProxyApp { .insert(Header { key: "head_block_num", value: head_block_num.map(|x| x.to_string()).as_ref(), + }) + .insert(Header { + key: "chain_id", + value: Some(&chain_id.to_le_bytes()), }); // save the key and headers for when we log the response @@ -1752,7 +1744,6 @@ impl Web3ProxyApp { .context("stat_sender sending response stat")?; } - #[cfg(rdkafka)] if let Some((kafka_key, kafka_headers)) = kafka_stuff { let kafka_producer = self .kafka_producer diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 31ed40fb..9a40e763 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -376,7 +376,7 @@ fn main() -> anyhow::Result<()> { x.main(pagerduty_async, top_config).await } SubCommand::PopularityContest(x) => x.main().await, - SubCommand::SearchKafka(x) => x.main().await, + SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await, SubCommand::Sentryd(x) => { if cli_config.sentry_url.is_none() { warn!("sentry_url is not set! Logs will only show in this console");