diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs index 35d11e9d..517e2f88 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs @@ -52,16 +52,25 @@ impl SearchKafkaSubCommand { let wanted_kafka_key = wanted_kafka_key.as_ref().map(|x| &x[..]); - let brokers = top_config + let kafka_brokers = top_config .app .kafka_urls .context("top_config.app.kafka_urls is required")?; let mut consumer = ClientConfig::new(); + let security_protocol = if kafka_brokers.starts_with("SSL:") { + "ssl" + } else if kafka_brokers.starts_with("TCP:") { + "plaintext" + } else { + return Err(anyhow::anyhow!("unexpected kafka protocol")); + }; + consumer - .set("bootstrap.servers", &brokers) + .set("bootstrap.servers", &kafka_brokers) .set("enable.partition.eof", "false") + .set("security.protocol", security_protocol) .set("session.timeout.ms", "6000") .set("enable.auto.commit", "false");