Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header removal for TextIO #29202

Merged
merged 21 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.53.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.52.0] - Unreleased

## Highlights
Expand All @@ -77,6 +112,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
* state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)).
* Beam YAML stable release. Beam pipelines can now be written using YAML and leverage the Beam YAML framework which includes a preliminary set of IO's and turnkey transforms. More information can be found in the YAML root folder and in the [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md).


## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
41 changes: 36 additions & 5 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public static Read read() {
return new AutoValue_TextIO_Read.Builder()
.setCompression(Compression.AUTO)
.setHintMatchesManyFiles(false)
.setSkipHeaderLines(0)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
.build();
}
Expand All @@ -214,6 +215,7 @@ public static Read read() {
public static ReadAll readAll() {
return new AutoValue_TextIO_ReadAll.Builder()
.setCompression(Compression.AUTO)
.setSkipHeaderLines(0)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.build();
}
Expand All @@ -228,6 +230,7 @@ public static ReadFiles readFiles() {
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
.setSkipHeaderLines(0)
.build();
}

Expand Down Expand Up @@ -286,6 +289,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<String>
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract int getSkipHeaderLines();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -300,6 +305,8 @@ abstract static class Builder {

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract Read build();
}

Expand Down Expand Up @@ -396,6 +403,10 @@ public Read withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}

public Read withSkipHeaderLines(int skipHeaderLines) {
return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
}

static boolean isSelfOverlapping(byte[] s) {
// s self-overlaps if v exists such as s = vu = wv with u and w non empty
for (int i = 1; i < s.length - 1; ++i) {
Expand All @@ -422,7 +433,9 @@ public PCollection<String> expand(PBegin input) {
FileIO.readMatches()
.withCompression(getCompression())
.withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
.apply(
"Via ReadFiles",
readFiles().withDelimiter(getDelimiter()).withSkipHeaderLines(getSkipHeaderLines()));
}

// Helper to create a source specific to the requested compression type.
Expand All @@ -431,7 +444,8 @@ protected FileBasedSource<String> getSource() {
new TextSource(
getFilepattern(),
getMatchConfiguration().getEmptyMatchTreatment(),
getDelimiter()))
getDelimiter(),
getSkipHeaderLines()))
.withCompression(getCompression());
}

Expand Down Expand Up @@ -468,6 +482,8 @@ public abstract static class ReadAll
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract int getSkipHeaderLines();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -478,6 +494,8 @@ abstract static class Builder {

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract ReadAll build();
}

Expand Down Expand Up @@ -560,6 +578,8 @@ public abstract static class ReadFiles
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract int getSkipHeaderLines();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -568,6 +588,8 @@ abstract static class Builder {

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract ReadFiles build();
}

Expand All @@ -581,13 +603,17 @@ public ReadFiles withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}

public ReadFiles withSkipHeaderLines(int skipHeaderLines) {
return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
}

@Override
public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) {
return input.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
getDesiredBundleSizeBytes(),
new CreateTextSourceFn(getDelimiter()),
new CreateTextSourceFn(getDelimiter(), getSkipHeaderLines()),
StringUtf8Coder.of()));
}

Expand All @@ -602,15 +628,20 @@ public void populateDisplayData(DisplayData.Builder builder) {
private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
private byte[] delimiter;
private int skipHeaderLines;

private CreateTextSourceFn(byte[] delimiter) {
private CreateTextSourceFn(byte[] delimiter, int skipHeaderLines) {
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

@Override
public FileBasedSource<String> apply(String input) {
return new TextSource(
StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter);
StaticValueProvider.of(input),
EmptyMatchTreatment.DISALLOW,
delimiter,
skipHeaderLines);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public abstract class TextRowCountEstimator {
@SuppressWarnings("mutable")
public abstract byte @Nullable [] getDelimiters();

public abstract int getSkipHeaderLines();

public abstract String getFilePattern();

public abstract Compression getCompression();
Expand All @@ -62,7 +64,8 @@ public static TextRowCountEstimator.Builder builder() {
.setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE)
.setCompression(DEFAULT_COMPRESSION)
.setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT)
.setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT);
.setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT)
.setSkipHeaderLines(0);
}

/**
Expand Down Expand Up @@ -114,7 +117,8 @@ public Double estimateRowCount(PipelineOptions pipelineOptions)
new TextSource(
ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()),
getEmptyMatchTreatment(),
getDelimiters());
getDelimiters(),
getSkipHeaderLines());
FileBasedSource<String> source =
CompressedSource.from(textSource).withCompression(file.getCompression());
try (BoundedSource.BoundedReader<String> reader =
Expand Down Expand Up @@ -160,6 +164,8 @@ public abstract Builder setDirectoryTreatment(

public abstract Builder setDelimiters(byte @Nullable [] delimiters);

public abstract Builder setSkipHeaderLines(int skipHeaderLines);

public abstract Builder setFilePattern(String filePattern);

public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
Expand Down
64 changes: 54 additions & 10 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,43 @@
public class TextSource extends FileBasedSource<String> {
byte[] delimiter;

int skipHeaderLines;

public TextSource(
ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
ValueProvider<String> fileSpec,
EmptyMatchTreatment emptyMatchTreatment,
byte[] delimiter,
int skipHeaderLines) {
super(fileSpec, emptyMatchTreatment, 1L);
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
public TextSource(
ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
this(fileSpec, emptyMatchTreatment, delimiter, 0);
}

public TextSource(
MatchResult.Metadata metadata, long start, long end, byte[] delimiter, int skipHeaderLines) {
super(metadata, 1L, start, end);
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
this(metadata, start, end, delimiter, 0);
}

@Override
protected FileBasedSource<String> createForSubrangeOfFile(
MatchResult.Metadata metadata, long start, long end) {
return new TextSource(metadata, start, end, delimiter);
return new TextSource(metadata, start, end, delimiter, skipHeaderLines);
}

@Override
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
return new TextBasedReader(this, delimiter);
return new TextBasedReader(this, delimiter, skipHeaderLines);
}

@Override
Expand All @@ -98,6 +115,7 @@ static class TextBasedReader extends FileBasedReader<String> {
private static final byte LF = '\n';

private final byte @Nullable [] delimiter;
private final int skipHeaderLines;
private final ByteArrayOutputStream str;
private final byte[] buffer;
private final ByteBuffer byteBuffer;
Expand All @@ -112,11 +130,16 @@ static class TextBasedReader extends FileBasedReader<String> {
private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer

private TextBasedReader(TextSource source, byte[] delimiter) {
this(source, delimiter, 0);
}

private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines) {
super(source);
this.buffer = new byte[READ_BUFFER_SIZE];
this.str = new ByteArrayOutputStream();
this.byteBuffer = ByteBuffer.wrap(buffer);
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

@Override
Expand Down Expand Up @@ -171,21 +194,42 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
} else {
startOfNextRecord = bufferPosn = (int) requiredPosition;
}
skipHeader(skipHeaderLines, true);
} else {
((SeekableByteChannel) channel).position(requiredPosition);
startOfNextRecord = requiredPosition;
skipHeader(skipHeaderLines, false);
if (requiredPosition > startOfNextRecord) {
Copy link

@ghost ghost Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what will happen if multiple readers having requiredPosition <= startOfNextRecord here.
In this case, more than one readers are assigned to a start position that falls in the header lines (assuming there are multiple long header lines). Not sure if the current code is handling this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. In this case, among those readers, only one will have an end position outside of the header lines, and will continue to read. So this should not be an issue then.

((SeekableByteChannel) channel).position(requiredPosition);
startOfNextRecord = requiredPosition;
bufferLength = bufferPosn = 0;
}
// Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
// to the beginning of the next record.
readNextRecord();
currentValue = null;
}

// Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
// to the beginning of the next record.
readNextRecord();
currentValue = null;
} else {
// Check to see if we start with the UTF_BOM bytes skipping them if present.
if (fileStartsWithBom()) {
startOfNextRecord = bufferPosn = UTF8_BOM.size();
}
skipHeader(skipHeaderLines, false);
}
}

private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException {
if (headerLines == 1) {
readNextRecord();
} else if (headerLines > 1) {
Copy link

@ghost ghost Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test to cover the skipping of multiple header lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// this will be expensive
((SeekableByteChannel) inChannel).position(0);
for (int line = 0; line < headerLines; ++line) {
readNextRecord();
}
} else if (headerLines == 0 && skipFirstLine) {
readNextRecord();
}
currentValue = null;
}

private boolean fileStartsWithBom() throws IOException {
Expand Down
Loading
Loading