Skip to content

Commit

Permalink
Do not fail on resolve kafka streams topics when topics check disabled
Browse files Browse the repository at this point in the history
Resolves
#39120
  • Loading branch information
pcasaes committed Mar 4, 2024
1 parent 607ece4 commit 6daa92f
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,22 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
this.executorService = executorService;

this.topicsTimeout = runtimeConfig.topicsTimeout;
this.trimmedTopics = runtimeConfig.getTrimmedTopics();
this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList();
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener);
this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient);
}

private boolean isTopicsCheckEnabled() {
return topicsTimeout.compareTo(Duration.ZERO) > 0;
}

public void onStartup(@Observes StartupEvent event, Event<KafkaStreams> kafkaStreamsEvent) {
if (kafkaStreams != null) {
kafkaStreamsEvent.fire(kafkaStreams);
executorService.execute(() -> {
if (topicsTimeout.compareTo(Duration.ZERO) > 0) {
if (isTopicsCheckEnabled()) {
try {
waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout);
} catch (InterruptedException e) {
Expand Down

0 comments on commit 6daa92f

Please sign in to comment.