From f5775e557706c2b66442d4850d8970a731c5e053 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Apr 2023 14:28:28 -0700 Subject: [PATCH] add security.protocol to search_kafka script --- web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs index 35d11e9d..517e2f88 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs @@ -52,16 +52,25 @@ impl SearchKafkaSubCommand { let wanted_kafka_key = wanted_kafka_key.as_ref().map(|x| &x[..]); - let brokers = top_config + let kafka_brokers = top_config .app .kafka_urls .context("top_config.app.kafka_urls is required")?; let mut consumer = ClientConfig::new(); + let security_protocol = if kafka_brokers.starts_with("SSL:") { + "ssl" + } else if kafka_brokers.starts_with("TCP:") { + "plaintext" + } else { + return Err(anyhow::anyhow!("unexpected kafka protocol")); + }; + consumer - .set("bootstrap.servers", &brokers) + .set("bootstrap.servers", &kafka_brokers) .set("enable.partition.eof", "false") + .set("security.protocol", security_protocol) .set("session.timeout.ms", "6000") .set("enable.auto.commit", "false");