From 19f9423c2bf6ca42133bb650fd4c247331a8774f Mon Sep 17 00:00:00 2001 From: jto Date: Mon, 28 Aug 2023 18:41:48 +0200 Subject: [PATCH] Fix NPE in tests --- .../apache/beam/runners/flink/FlinkPipelineOptions.java | 7 ++++--- .../flink/translation/wrappers/SourceInputFormat.java | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 8d007072b250..1e01514fe8b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -313,11 +313,12 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); - @Description("Set the maximum size of input split when data is read from a filesystem. -1 implies no max size.") - @Default.Long(-1) + @Description( + "Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.") + @Default.Long(0) Long getFileInputSplitMaxSizeMB(); - void setFileInputSplitMaxSizeMB(Long inputSplitMaxSizeMB); + void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index c9502228901e..a1b8bced7a1d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -113,7 +113,10 @@ public float getAverageRecordWidth() { private long getDesiredSizeBytes(int numSplits) throws Exception { long totalSize = initialSource.getEstimatedSizeBytes(options); long defaultSplitSize = totalSize / numSplits; - long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + long maxSplitSize = 0; + if (options != null) { + maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + } if (initialSource instanceof FileBasedSource && maxSplitSize > 0) { // Most of the time parallelism is < number of files in source. // Each file becomes a unique split which commonly create skew.