Skip to content

Commit

Permalink
Fix NPE in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Aug 29, 2023
1 parent 9579fe0 commit 19f9423
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,12 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description("Set the maximum size of input split when data is read from a filesystem. -1 implies no max size.")
@Default.Long(-1)
@Description(
"Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.")
@Default.Long(0)
Long getFileInputSplitMaxSizeMB();

void setFileInputSplitMaxSizeMB(Long inputSplitMaxSizeMB);
void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ public float getAverageRecordWidth() {
private long getDesiredSizeBytes(int numSplits) throws Exception {
long totalSize = initialSource.getEstimatedSizeBytes(options);
long defaultSplitSize = totalSize / numSplits;
long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB();
long maxSplitSize = 0;
if (options != null) {
maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB();
}
if (initialSource instanceof FileBasedSource && maxSplitSize > 0) {
// Most of the time parallelism is < number of files in source.
// Each file becomes a unique split which commonly create skew.
Expand Down

0 comments on commit 19f9423

Please sign in to comment.