add security.protocol to search_kafka script
This commit is contained in:
parent
decdac4b6d
commit
f5775e5577
@ -52,16 +52,25 @@ impl SearchKafkaSubCommand {
|
||||
|
||||
let wanted_kafka_key = wanted_kafka_key.as_ref().map(|x| &x[..]);
|
||||
|
||||
let brokers = top_config
|
||||
let kafka_brokers = top_config
|
||||
.app
|
||||
.kafka_urls
|
||||
.context("top_config.app.kafka_urls is required")?;
|
||||
|
||||
let mut consumer = ClientConfig::new();
|
||||
|
||||
let security_protocol = if kafka_brokers.starts_with("SSL:") {
|
||||
"ssl"
|
||||
} else if kafka_brokers.starts_with("TCP:") {
|
||||
"plaintext"
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("unexpected kafka protocol"));
|
||||
};
|
||||
|
||||
consumer
|
||||
.set("bootstrap.servers", &brokers)
|
||||
.set("bootstrap.servers", &kafka_brokers)
|
||||
.set("enable.partition.eof", "false")
|
||||
.set("security.protocol", security_protocol)
|
||||
.set("session.timeout.ms", "6000")
|
||||
.set("enable.auto.commit", "false");
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user