diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index f03ddabeb9bda..604017f831591 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -109,18 +109,22 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream this.executorService = executorService; this.topicsTimeout = runtimeConfig.topicsTimeout; - this.trimmedTopics = runtimeConfig.getTrimmedTopics(); + this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList(); this.streamsConfig = new StreamsConfig(kafkaStreamsProperties); this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(), kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener); this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient); } + private boolean isTopicsCheckEnabled() { + return topicsTimeout.compareTo(Duration.ZERO) > 0; + } + public void onStartup(@Observes StartupEvent event, Event kafkaStreamsEvent) { if (kafkaStreams != null) { kafkaStreamsEvent.fire(kafkaStreams); executorService.execute(() -> { - if (topicsTimeout.compareTo(Duration.ZERO) > 0) { + if (isTopicsCheckEnabled()) { try { waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout); } catch (InterruptedException e) {