From 1e76543d24d45c5b85efbbbaa2e97506c3fd3a54 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Sat, 7 Sep 2024 21:03:07 +0200 Subject: [PATCH] Kafka TLS Registry integration: include tls-configuration-name in Kafka config, when it is configured Fixes #43107 --- .../quarkus/kafka/client/runtime/KafkaAdminClient.java | 8 ++++++-- .../client/runtime/KafkaRuntimeConfigProducer.java | 4 +++- .../kafka/client/tls/QuarkusKafkaSslEngineFactory.java | 10 ++++++++-- .../kafka/streams/runtime/KafkaStreamsProducer.java | 5 +++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java index 1600561f3f1ef..62b36e7d1e90b 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java @@ -1,5 +1,7 @@ package io.quarkus.kafka.client.runtime; +import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY; + import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -33,8 +35,10 @@ void init() { Map conf = new HashMap<>(); conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT); for (Map.Entry entry : config.entrySet()) { - if (AdminClientConfig.configNames().contains(entry.getKey())) { - conf.put(entry.getKey(), entry.getValue().toString()); + String key = entry.getKey(); + // include TLS config name if it has been configured + if (TLS_CONFIG_NAME_KEY.equals(key) || AdminClientConfig.configNames().contains(key)) { + conf.put(key, entry.getValue().toString()); } } client = AdminClient.create(conf); diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java index 7490541fc174d..ff679cd61c80e 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java @@ -16,6 +16,8 @@ @Singleton public class KafkaRuntimeConfigProducer { + public static final String TLS_CONFIG_NAME_KEY = "tls-configuration-name"; + // not "kafka.", because we also inspect env vars, which start with "KAFKA_" private static final String CONFIG_PREFIX = "kafka"; private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui"; @@ -45,7 +47,7 @@ public Map createKafkaRuntimeConfig(Config config, ApplicationCo .replace("_", "."); String value = config.getOptionalValue(propertyName, String.class).orElse(""); result.put(effectivePropertyName, value); - if (effectivePropertyName.equals("tls-configuration-name")) { + if (effectivePropertyName.equals(TLS_CONFIG_NAME_KEY)) { result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName()); } } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java index 42aab3be3e781..1ebd9d795aed1 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java @@ -1,5 +1,7 @@ package io.quarkus.kafka.client.tls; +import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY; + import java.io.IOException; import java.security.KeyStore; import java.util.Map; @@ -92,7 +94,11 @@ public void close() throws IOException { @Override public void configure(Map configs) { - String tlsConfigName = (String) configs.get("tls-configuration-name"); + String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY); + if (tlsConfigName == null) { + throw new IllegalArgumentException( + "The 'tls-configuration-name' property is required for Kafka Quarkus TLS Registry integration."); + } Instance tlsConfig = CDI.current().getBeanManager().createInstance() .select(TlsConfigurationRegistry.class); @@ -118,7 +124,7 @@ public void configure(Map configs) { * @param configs the Kafka client configuration */ public static void checkForOtherSslConfigs(Map configs) { - String tlsConfigName = (String) configs.get("tls-configuration-name"); + String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY); for (String sslConfig : KAFKA_SSL_CONFIGS) { if (configs.containsKey(sslConfig)) { log.warnf( diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index 604017f831591..570ab735c8f07 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -1,5 +1,6 @@ package io.quarkus.kafka.streams.runtime; +import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY; import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER; import java.net.InetSocketAddress; @@ -362,6 +363,10 @@ private static void waitForTopicsToBeCreated(Admin adminClient, Collection first