Skip to content

Commit

Permalink
feat(kafka-runner): create state dir early to avoid concurrent access
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 31, 2022
1 parent 2c11418 commit e660ed8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public void run() {

this.streams = this.kafkaExecutors
.stream()
.parallel()
.map(executor -> {
Properties properties = new Properties();
// build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.kestra.runner.kafka.configs.StreamDefaultsConfig;
import org.slf4j.Logger;

import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -97,8 +98,17 @@ public KafkaStreamService.Stream of(Class<?> clientId, Class<?> groupId, Topolog
);
}

if (properties.containsKey(StreamsConfig.STATE_DIR_CONFIG)) {
File stateDir = new File((String) properties.get(StreamsConfig.STATE_DIR_CONFIG));

if (!stateDir.exists()) {
//noinspection ResultOfMethodCallIgnored
stateDir.mkdirs();
}
}

Stream stream = new Stream(topology, properties, metricsEnabled ? metricRegistry : null, logger);
eventPublisher.publishEvent(new KafkaStreamEndpoint.Event(clientId.getName(), stream));
eventPublisher.publishEventAsync(new KafkaStreamEndpoint.Event(clientId.getName(), stream));

return stream;
}
Expand Down

0 comments on commit e660ed8

Please sign in to comment.