From bf47289b1cd2bb0e23fb4017c329f83f410e7845 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Wed, 8 Nov 2023 15:59:17 +0100 Subject: [PATCH] Add header removal for TextIO (#29202) * add header removal * add multiple header lines removal --- CHANGES.md | 2 + .../java/org/apache/beam/sdk/io/TextIO.java | 41 ++++++++-- .../beam/sdk/io/TextRowCountEstimator.java | 10 ++- .../org/apache/beam/sdk/io/TextSource.java | 64 +++++++++++++--- .../apache/beam/sdk/io/TextIOReadTest.java | 75 ++++++++++++++++--- 5 files changed, 164 insertions(+), 28 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 17ac8a1d7010..523ef3455aec 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)). ## New Features / Improvements @@ -107,6 +108,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)). * state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)). * Beam YAML stable release. Beam pipelines can now be written using YAML and leverage the Beam YAML framework which includes a preliminary set of IO's and turnkey transforms. More information can be found in the YAML root folder and in the [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md). + ## Breaking Changes * `org.apache.beam.sdk.io.CountingSource.CounterMark` uses custom `CounterMarkCoder` as a default coder since all Avro-dependent diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 33beff23b311..2c7a4fc5d4f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -191,6 +191,7 @@ public static Read read() { return new AutoValue_TextIO_Read.Builder() .setCompression(Compression.AUTO) .setHintMatchesManyFiles(false) + .setSkipHeaderLines(0) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .build(); } @@ -214,6 +215,7 @@ public static Read read() { public static ReadAll readAll() { return new AutoValue_TextIO_ReadAll.Builder() .setCompression(Compression.AUTO) + .setSkipHeaderLines(0) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .build(); } @@ -228,6 +230,7 @@ public static ReadFiles readFiles() { // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .setSkipHeaderLines(0) .build(); } @@ -286,6 +289,8 @@ public abstract static class Read extends PTransform @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); + abstract int getSkipHeaderLines(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -300,6 +305,8 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setSkipHeaderLines(int skipHeaderLines); + abstract Read build(); } @@ -396,6 +403,10 @@ public Read withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } + public Read withSkipHeaderLines(int skipHeaderLines) { + return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); + } + static boolean isSelfOverlapping(byte[] s) { // s self-overlaps if v exists such as s = vu = wv with u and w non empty for (int i = 1; i < s.length - 1; ++i) { @@ -422,7 +433,9 @@ public PCollection expand(PBegin input) { FileIO.readMatches() .withCompression(getCompression()) .withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) - .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter())); + .apply( + "Via ReadFiles", + readFiles().withDelimiter(getDelimiter()).withSkipHeaderLines(getSkipHeaderLines())); } // Helper to create a source specific to the requested compression type. @@ -431,7 +444,8 @@ protected FileBasedSource getSource() { new TextSource( getFilepattern(), getMatchConfiguration().getEmptyMatchTreatment(), - getDelimiter())) + getDelimiter(), + getSkipHeaderLines())) .withCompression(getCompression()); } @@ -468,6 +482,8 @@ public abstract static class ReadAll @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); + abstract int getSkipHeaderLines(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -478,6 +494,8 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setSkipHeaderLines(int skipHeaderLines); + abstract ReadAll build(); } @@ -560,6 +578,8 @@ public abstract static class ReadFiles @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); + abstract int getSkipHeaderLines(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -568,6 +588,8 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setSkipHeaderLines(int skipHeaderLines); + abstract ReadFiles build(); } @@ -581,13 +603,17 @@ public ReadFiles withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } + public ReadFiles withSkipHeaderLines(int skipHeaderLines) { + return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); + } + @Override public PCollection expand(PCollection input) { return input.apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getDelimiter()), + new CreateTextSourceFn(getDelimiter(), getSkipHeaderLines()), StringUtf8Coder.of())); } @@ -602,15 +628,20 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class CreateTextSourceFn implements SerializableFunction> { private byte[] delimiter; + private int skipHeaderLines; - private CreateTextSourceFn(byte[] delimiter) { + private CreateTextSourceFn(byte[] delimiter, int skipHeaderLines) { this.delimiter = delimiter; + this.skipHeaderLines = skipHeaderLines; } @Override public FileBasedSource apply(String input) { return new TextSource( - StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter); + StaticValueProvider.of(input), + EmptyMatchTreatment.DISALLOW, + delimiter, + skipHeaderLines); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java index 32b7fb12f414..8542ce011098 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -46,6 +46,8 @@ public abstract class TextRowCountEstimator { @SuppressWarnings("mutable") public abstract byte @Nullable [] getDelimiters(); + public abstract int getSkipHeaderLines(); + public abstract String getFilePattern(); public abstract Compression getCompression(); @@ -62,7 +64,8 @@ public static TextRowCountEstimator.Builder builder() { .setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE) .setCompression(DEFAULT_COMPRESSION) .setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT) - .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT); + .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT) + .setSkipHeaderLines(0); } /** @@ -114,7 +117,8 @@ public Double estimateRowCount(PipelineOptions pipelineOptions) new TextSource( ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()), getEmptyMatchTreatment(), - getDelimiters()); + getDelimiters(), + getSkipHeaderLines()); FileBasedSource source = CompressedSource.from(textSource).withCompression(file.getCompression()); try (BoundedSource.BoundedReader reader = @@ -160,6 +164,8 @@ public abstract Builder setDirectoryTreatment( public abstract Builder setDelimiters(byte @Nullable [] delimiters); + public abstract Builder setSkipHeaderLines(int skipHeaderLines); + public abstract Builder setFilePattern(String filePattern); public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index bef30dffa8ac..3d62c677950a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -56,26 +56,43 @@ public class TextSource extends FileBasedSource { byte[] delimiter; + int skipHeaderLines; + public TextSource( - ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { + ValueProvider fileSpec, + EmptyMatchTreatment emptyMatchTreatment, + byte[] delimiter, + int skipHeaderLines) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; + this.skipHeaderLines = skipHeaderLines; } - public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + public TextSource( + ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { + this(fileSpec, emptyMatchTreatment, delimiter, 0); + } + + public TextSource( + MatchResult.Metadata metadata, long start, long end, byte[] delimiter, int skipHeaderLines) { super(metadata, 1L, start, end); this.delimiter = delimiter; + this.skipHeaderLines = skipHeaderLines; + } + + public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + this(metadata, start, end, delimiter, 0); } @Override protected FileBasedSource createForSubrangeOfFile( MatchResult.Metadata metadata, long start, long end) { - return new TextSource(metadata, start, end, delimiter); + return new TextSource(metadata, start, end, delimiter, skipHeaderLines); } @Override protected FileBasedReader createSingleFileReader(PipelineOptions options) { - return new TextBasedReader(this, delimiter); + return new TextBasedReader(this, delimiter, skipHeaderLines); } @Override @@ -98,6 +115,7 @@ static class TextBasedReader extends FileBasedReader { private static final byte LF = '\n'; private final byte @Nullable [] delimiter; + private final int skipHeaderLines; private final ByteArrayOutputStream str; private final byte[] buffer; private final ByteBuffer byteBuffer; @@ -112,11 +130,16 @@ static class TextBasedReader extends FileBasedReader { private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer private TextBasedReader(TextSource source, byte[] delimiter) { + this(source, delimiter, 0); + } + + private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines) { super(source); this.buffer = new byte[READ_BUFFER_SIZE]; this.str = new ByteArrayOutputStream(); this.byteBuffer = ByteBuffer.wrap(buffer); this.delimiter = delimiter; + this.skipHeaderLines = skipHeaderLines; } @Override @@ -171,21 +194,42 @@ protected void startReading(ReadableByteChannel channel) throws IOException { } else { startOfNextRecord = bufferPosn = (int) requiredPosition; } + skipHeader(skipHeaderLines, true); } else { - ((SeekableByteChannel) channel).position(requiredPosition); - startOfNextRecord = requiredPosition; + skipHeader(skipHeaderLines, false); + if (requiredPosition > startOfNextRecord) { + ((SeekableByteChannel) channel).position(requiredPosition); + startOfNextRecord = requiredPosition; + bufferLength = bufferPosn = 0; + } + // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point + // to the beginning of the next record. + readNextRecord(); + currentValue = null; } - // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point - // to the beginning of the next record. - readNextRecord(); - currentValue = null; } else { // Check to see if we start with the UTF_BOM bytes skipping them if present. if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } + skipHeader(skipHeaderLines, false); + } + } + + private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException { + if (headerLines == 1) { + readNextRecord(); + } else if (headerLines > 1) { + // this will be expensive + ((SeekableByteChannel) inChannel).position(0); + for (int line = 0; line < headerLines; ++line) { + readNextRecord(); + } + } else if (headerLines == 0 && skipFirstLine) { + readNextRecord(); } + currentValue = null; } private boolean fileStartsWithBom() throws IOException { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 379345b1001e..84c05ee6c906 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -245,15 +245,24 @@ private static File createZipFile( } private static TextSource prepareSource( - TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter) throws IOException { + TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter, int skipHeaderLines) + throws IOException { Path path = temporaryFolder.newFile().toPath(); Files.write(path, data); - return getTextSource(path.toString(), delimiter); + return getTextSource(path.toString(), delimiter, skipHeaderLines); } - public static TextSource getTextSource(String path, @Nullable byte[] delimiter) { + public static TextSource getTextSource( + String path, @Nullable byte[] delimiter, int skipHeaderLines) { return new TextSource( - ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter); + ValueProvider.StaticValueProvider.of(path), + EmptyMatchTreatment.DISALLOW, + delimiter, + skipHeaderLines); + } + + public static TextSource getTextSource(String path, @Nullable byte[] delimiter) { + return getTextSource(path, delimiter, 0); } private static String getFileSuffix(Compression compression) { @@ -384,7 +393,7 @@ public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel Files.write(path, line.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); FileBasedSource source = - getTextSource(path.toString(), null) + getTextSource(path.toString(), null, 0) .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); FileBasedReader reader = source.createSingleFileReader(PipelineOptionsFactory.create()); @@ -433,7 +442,49 @@ public void testSplittingSource() throws Exception { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null); + return TextIOReadTest.prepareSource(tempFolder, data, null, 0); + } + + private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { + TextSource source = prepareSource(data); + List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertThat( + actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); + } + } + + /** Tests for reading files with/without header. */ + @RunWith(Parameterized.class) + public static class SkippingHeaderTest { + private static final ImmutableList EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.builder() + .add(new Object[] {"\n\n\n", ImmutableList.of("", ""), 1}) + .add(new Object[] {"\n", ImmutableList.of(), 1}) + .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED, 1}) + .add(new Object[] {"header1\nheader2\nasdf\nhjkl\nxyz\n", EXPECTED, 2}) + .build(); + } + + @Parameterized.Parameter(0) + public String line; + + @Parameterized.Parameter(1) + public ImmutableList expected; + + @Parameterized.Parameter(2) + public int skipHeaderLines; + + @Test + public void testReadLines() throws Exception { + runTestReadWithData(line.getBytes(UTF_8), expected); + } + + private TextSource prepareSource(byte[] data) throws IOException { + return TextIOReadTest.prepareSource(tempFolder, data, null, skipHeaderLines); } private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { @@ -477,7 +528,8 @@ public static Iterable data() { @Test public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( - TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), + TextIOReadTest.prepareSource( + tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, 0), PipelineOptionsFactory.create()); } @@ -489,7 +541,7 @@ public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel( Files.write(path, testCase.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); FileBasedSource source = - getTextSource(path.toString(), delimiter) + getTextSource(path.toString(), delimiter, 0) .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); FileBasedReader reader = source.createSingleFileReader(PipelineOptionsFactory.create()); @@ -743,7 +795,7 @@ public void testTextIOGetName() { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null); + return TextIOReadTest.prepareSource(tempFolder, data, null, 0); } @Test @@ -977,7 +1029,8 @@ public void testReadFilesWithFilename() throws IOException { new TextSource( ValueProvider.StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, - new byte[] {'\n'}); + new byte[] {'\n'}, + 0); PCollection> lines = p.apply( @@ -1102,7 +1155,7 @@ public void processElement(ProcessContext c) { ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename()); // Create a TextSource, passing null as the delimiter to use the default // delimiters ('\n', '\r', or '\r\n'). - TextSource textSource = new TextSource(filenameProvider, null, null); + TextSource textSource = new TextSource(filenameProvider, null, null, 0); try { BoundedSource.BoundedReader reader = textSource