Skip to content

Commit

Permalink
Merge pull request quarkusio#39121 from pcasaes/pc/topics-check-kafka…
Browse files Browse the repository at this point in the history
…-streams

Do not fail on resolve kafka streams topics when topics check disabled
  • Loading branch information
gsmet authored Mar 5, 2024
2 parents 327dd98 + 809c48c commit a5efd43
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,22 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
this.executorService = executorService;

this.topicsTimeout = runtimeConfig.topicsTimeout;
this.trimmedTopics = runtimeConfig.getTrimmedTopics();
this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList();
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener);
this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient);
}

private boolean isTopicsCheckEnabled() {
return topicsTimeout.compareTo(Duration.ZERO) > 0;
}

public void onStartup(@Observes StartupEvent event, Event<KafkaStreams> kafkaStreamsEvent) {
if (kafkaStreams != null) {
kafkaStreamsEvent.fire(kafkaStreams);
executorService.execute(() -> {
if (topicsTimeout.compareTo(Duration.ZERO) > 0) {
if (isTopicsCheckEnabled()) {
try {
waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout);
} catch (InterruptedException e) {
Expand Down

0 comments on commit a5efd43

Please sign in to comment.