From 6daa92f76ea5e17b2e9286726b8f55bf180ef923 Mon Sep 17 00:00:00 2001 From: Paulo Casaes Date: Fri, 1 Mar 2024 11:15:41 -0800 Subject: [PATCH] Do not fail on resolve kafka streams topics when topics check disabled Resolves https://github.com/quarkusio/quarkus/issues/39120 --- .../kafka/streams/runtime/KafkaStreamsProducer.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index f03ddabeb9bda6..604017f8315911 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -109,18 +109,22 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream this.executorService = executorService; this.topicsTimeout = runtimeConfig.topicsTimeout; - this.trimmedTopics = runtimeConfig.getTrimmedTopics(); + this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList(); this.streamsConfig = new StreamsConfig(kafkaStreamsProperties); this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(), kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener); this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient); } + private boolean isTopicsCheckEnabled() { + return topicsTimeout.compareTo(Duration.ZERO) > 0; + } + public void onStartup(@Observes StartupEvent event, Event kafkaStreamsEvent) { if (kafkaStreams != null) { kafkaStreamsEvent.fire(kafkaStreams); executorService.execute(() -> { - if (topicsTimeout.compareTo(Duration.ZERO) > 0) { + if (isTopicsCheckEnabled()) { try { waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout); } catch (InterruptedException e) {