From 089fa2a5b014a5d65e378189c48af9f0c306c5a3 Mon Sep 17 00:00:00 2001 From: Alexey Romanenko <33895511+aromanenko-dev@users.noreply.github.com> Date: Wed, 26 Oct 2022 18:03:14 +0200 Subject: [PATCH] [23832] Remove ParquetIO.withSplit (closes #23832) (#23833) --- CHANGES.md | 3 + .../apache/beam/sdk/io/parquet/ParquetIO.java | 236 ++---------------- .../beam/sdk/io/parquet/ParquetIOTest.java | 109 +------- .../beam/sdk/tpcds/SqlTransformRunner.java | 1 - 4 files changed, 26 insertions(+), 323 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 520609504c80..f6a17cc79a2f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -58,6 +58,9 @@ than requiring them to be passed separately via the `--extra_package` option. ([#23684](https://github.com/apache/beam/pull/23684)) +## Breaking Changes + +* `ParquetIO.withSplit` was removed ([#23832](https://github.com/apache/beam/issues/23832)). # [2.43.0] - Unreleased diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index ee67989c5f42..433a53a20fe1 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -46,9 +46,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; -import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.ReadFn; import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.SplitReadFn; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.ValueProvider; @@ -77,7 +75,6 @@ import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; @@ -136,18 +133,8 @@ * PCollection output = files.apply(ParquetIO.readFiles(SCHEMA)); * } * - *

Splittable reading can be enabled by allowing the use of Splittable DoFn. It initially split - * the files into blocks of 64MB and may dynamically split further for higher read efficiency. It - * can be enabled by using {@link ParquetIO.Read#withSplit()}. - * - *

For example: - * - *

{@code
- * PCollection records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar").withSplit());
- * ...
- * }
- * - *

Since Beam version 2.35.0 the splittable reading is enabled by default. + *

ParquetIO leverages splittable reading by using Splittable DoFn. It initially splits the files + * into the blocks of 64MB and may dynamically split further for higher read efficiency. * *

Reading with projection can be enabled with the projection schema as following. Splittable * reading is enabled when reading with projection. The projection_schema contains only the column @@ -271,7 +258,6 @@ public static Read read(Schema schema) { return new AutoValue_ParquetIO_Read.Builder() .setSchema(schema) .setInferBeamSchema(false) - .setSplittable(true) .build(); } @@ -283,7 +269,6 @@ public static ReadFiles readFiles(Schema schema) { return new AutoValue_ParquetIO_ReadFiles.Builder() .setSchema(schema) .setInferBeamSchema(false) - .setSplittable(true) .build(); } @@ -292,10 +277,7 @@ public static ReadFiles readFiles(Schema schema) { * pattern) and converts to user defined type using provided parseFn. */ public static Parse parseGenericRecords(SerializableFunction parseFn) { - return new AutoValue_ParquetIO_Parse.Builder() - .setParseFn(parseFn) - .setSplittable(true) - .build(); + return new AutoValue_ParquetIO_Parse.Builder().setParseFn(parseFn).build(); } /** @@ -304,10 +286,7 @@ public static Parse parseGenericRecords(SerializableFunction ParseFiles parseFilesGenericRecords( SerializableFunction parseFn) { - return new AutoValue_ParquetIO_ParseFiles.Builder() - .setParseFn(parseFn) - .setSplittable(true) - .build(); + return new AutoValue_ParquetIO_ParseFiles.Builder().setParseFn(parseFn).build(); } /** Implementation of {@link #read(Schema)}. */ @@ -328,8 +307,6 @@ public abstract static class Read extends PTransform filepattern); abstract Builder setSchema(Schema schema); @@ -367,7 +342,6 @@ public Read from(String filepattern) { public Read withProjection(Schema projectionSchema, Schema encoderSchema) { return toBuilder() .setProjectionSchema(projectionSchema) - .setSplittable(true) .setEncoderSchema(encoderSchema) .build(); } @@ -389,28 +363,6 @@ public Read withBeamSchemas(boolean inferBeamSchema) { return toBuilder().setInferBeamSchema(inferBeamSchema).build(); } - /** - * Enable the Splittable reading. - * - * @deprecated as of version 2.35.0. Splittable reading is enabled by default. - */ - @Deprecated - public Read withSplit() { - return toBuilder().setSplittable(true).build(); - } - - /** - * Disable the Splittable reading. - * - * @deprecated This method may currently be used to opt-out of the default, splittable, - * behavior. However, this will be removed in a future release assuming no issues are - * discovered. - */ - @Deprecated - public Read withoutSplit() { - return toBuilder().setSplittable(false).build(); - } - /** * Define the Avro data model; see {@link AvroParquetReader.Builder#withDataModel(GenericData)}. */ @@ -431,10 +383,8 @@ public PCollection expand(PBegin input) { ReadFiles readFiles = readFiles(getSchema()) .withBeamSchemas(getInferBeamSchema()) - .withAvroDataModel(getAvroDataModel()); - if (isSplittable()) { - readFiles = readFiles.withSplit().withProjection(getProjectionSchema(), getEncoderSchema()); - } + .withAvroDataModel(getAvroDataModel()) + .withProjection(getProjectionSchema(), getEncoderSchema()); if (getConfiguration() != null) { readFiles = readFiles.withConfiguration(getConfiguration().get()); } @@ -452,7 +402,6 @@ public void populateDisplayData(DisplayData.Builder builder) { .add( DisplayData.item("inferBeamSchema", getInferBeamSchema()) .withLabel("Infer Beam Schema")) - .add(DisplayData.item("splittable", isSplittable())) .addIfNotNull(DisplayData.item("projectionSchema", String.valueOf(getProjectionSchema()))) .addIfNotNull(DisplayData.item("avroDataModel", String.valueOf(getAvroDataModel()))); if (this.getConfiguration() != null) { @@ -477,8 +426,6 @@ public abstract static class Parse extends PTransform> abstract @Nullable SerializableConfiguration getConfiguration(); - abstract boolean isSplittable(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -491,8 +438,6 @@ abstract static class Builder { abstract Builder setConfiguration(SerializableConfiguration configuration); - abstract Builder setSplittable(boolean splittable); - abstract Parse build(); } @@ -521,28 +466,6 @@ public Parse withConfiguration(Configuration configuration) { return toBuilder().setConfiguration(new SerializableConfiguration(configuration)).build(); } - /** - * Enable the Splittable reading. - * - * @deprecated as of version 2.35.0. Splittable reading is enabled by default. - */ - @Deprecated - public Parse withSplit() { - return toBuilder().setSplittable(true).build(); - } - - /** - * Disable the Splittable reading. - * - * @deprecated This method may currently be used to opt-out of the default, splittable, - * behavior. However, this will be removed in a future release assuming no issues are - * discovered. - */ - @Deprecated - public Parse withoutSplit() { - return toBuilder().setSplittable(false).build(); - } - @Override public PCollection expand(PBegin input) { checkNotNull(getFilepattern(), "Filepattern cannot be null."); @@ -554,7 +477,6 @@ public PCollection expand(PBegin input) { parseFilesGenericRecords(getParseFn()) .toBuilder() .setCoder(getCoder()) - .setSplittable(isSplittable()) .setConfiguration(getConfiguration()) .build()); } @@ -565,7 +487,6 @@ public void populateDisplayData(DisplayData.Builder builder) { builder .addIfNotNull( DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) - .add(DisplayData.item("splittable", isSplittable())) .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); if (this.getCoder() != null) { builder.add(DisplayData.item("coder", getCoder().getClass())); @@ -592,8 +513,6 @@ public abstract static class ParseFiles abstract @Nullable SerializableConfiguration getConfiguration(); - abstract boolean isSplittable(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -604,8 +523,6 @@ abstract static class Builder { abstract Builder setConfiguration(SerializableConfiguration configuration); - abstract Builder setSplittable(boolean split); - abstract ParseFiles build(); } @@ -626,43 +543,19 @@ public ParseFiles withConfiguration(Configuration configuration) { return toBuilder().setConfiguration(new SerializableConfiguration(configuration)).build(); } - /** - * Enable the Splittable reading. - * - * @deprecated as of version 2.35.0. Splittable reading is enabled by default. - */ - @Deprecated - public ParseFiles withSplit() { - return toBuilder().setSplittable(true).build(); - } - - /** - * Disable the Splittable reading. - * - * @deprecated This method may currently be used to opt-out of the default, splittable, - * behavior. However, this will be removed in a future release assuming no issues are - * discovered. - */ - @Deprecated - public ParseFiles withoutSplit() { - return toBuilder().setSplittable(false).build(); - } - @Override public PCollection expand(PCollection input) { checkArgument(!isGenericRecordOutput(), "Parse can't be used for reading as GenericRecord."); return input - .apply(ParDo.of(buildFileReadingFn())) + .apply(ParDo.of(new SplitReadFn<>(null, null, getParseFn(), getConfiguration()))) .setCoder(inferCoder(input.getPipeline().getCoderRegistry())); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .add(DisplayData.item("splittable", isSplittable())) - .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); + builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); if (this.getCoder() != null) { builder.add(DisplayData.item("coder", getCoder().getClass())); } @@ -676,13 +569,6 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** Returns Splittable or normal Parquet file reading DoFn. */ - private DoFn buildFileReadingFn() { - return isSplittable() - ? new SplitReadFn<>(null, null, getParseFn(), getConfiguration()) - : new ReadFn<>(null, getParseFn(), getConfiguration()); - } - /** Returns true if expected output is {@code PCollection}. */ private boolean isGenericRecordOutput() { String outputType = TypeDescriptors.outputOf(getParseFn()).getType().getTypeName(); @@ -735,8 +621,6 @@ public abstract static class ReadFiles abstract boolean getInferBeamSchema(); - abstract boolean isSplittable(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -753,8 +637,6 @@ abstract static class Builder { abstract Builder setInferBeamSchema(boolean inferBeamSchema); - abstract Builder setSplittable(boolean split); - abstract ReadFiles build(); } @@ -769,7 +651,6 @@ public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) { return toBuilder() .setProjectionSchema(projectionSchema) .setEncoderSchema(encoderSchema) - .setSplittable(true) .build(); } @@ -790,32 +671,18 @@ public ReadFiles withBeamSchemas(boolean inferBeamSchema) { return toBuilder().setInferBeamSchema(inferBeamSchema).build(); } - /** - * Enable the Splittable reading. - * - * @deprecated as of version 2.35.0. Splittable reading is enabled by default. - */ - @Deprecated - public ReadFiles withSplit() { - return toBuilder().setSplittable(true).build(); - } - - /** - * Disable the Splittable reading. - * - * @deprecated This method may currently be used to opt-out of the default, splittable, - * behavior. However, this will be removed in a future release assuming no issues are - * discovered. - */ - @Deprecated - public ReadFiles withoutSplit() { - return toBuilder().setSplittable(false).build(); - } - @Override public PCollection expand(PCollection input) { checkNotNull(getSchema(), "Schema can not be null"); - return input.apply(ParDo.of(getReaderFn())).setCoder(getCollectionCoder()); + return input + .apply( + ParDo.of( + new SplitReadFn<>( + getAvroDataModel(), + getProjectionSchema(), + GenericRecordPassthroughFn.create(), + getConfiguration()))) + .setCoder(getCollectionCoder()); } @Override @@ -826,7 +693,6 @@ public void populateDisplayData(DisplayData.Builder builder) { .add( DisplayData.item("inferBeamSchema", getInferBeamSchema()) .withLabel("Infer Beam Schema")) - .add(DisplayData.item("splittable", isSplittable())) .addIfNotNull(DisplayData.item("projectionSchema", String.valueOf(getProjectionSchema()))) .addIfNotNull(DisplayData.item("avroDataModel", String.valueOf(getAvroDataModel()))); if (this.getConfiguration() != null) { @@ -839,26 +705,13 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** Returns Parquet file reading function based on {@link #isSplittable()}. */ - private DoFn getReaderFn() { - return isSplittable() - ? new SplitReadFn<>( - getAvroDataModel(), - getProjectionSchema(), - GenericRecordPassthroughFn.create(), - getConfiguration()) - : new ReadFn<>( - getAvroDataModel(), GenericRecordPassthroughFn.create(), getConfiguration()); - } - /** * Returns {@link org.apache.beam.sdk.schemas.SchemaCoder} when using Beam schemas, {@link * AvroCoder} when not using Beam schema. */ @Experimental(Kind.SCHEMAS) private Coder getCollectionCoder() { - Schema coderSchema = - getProjectionSchema() != null && isSplittable() ? getEncoderSchema() : getSchema(); + Schema coderSchema = getProjectionSchema() != null ? getEncoderSchema() : getSchema(); return getInferBeamSchema() ? AvroUtils.schemaCoder(coderSchema) : AvroCoder.of(coderSchema); } @@ -1126,59 +979,6 @@ public Progress getProgress() { } } - /** - * @deprecated as of version 2.35.0. Splittable reading with {@link SplitReadFn} should be used - * instead. - */ - @Deprecated - static class ReadFn extends DoFn { - - private final Class modelClass; - - private final SerializableFunction parseFn; - - private final SerializableConfiguration configuration; - - ReadFn( - GenericData model, - SerializableFunction parseFn, - SerializableConfiguration configuration) { - this.modelClass = model != null ? model.getClass() : null; - this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null"); - this.configuration = configuration; - } - - @ProcessElement - public void processElement(@Element ReadableFile file, OutputReceiver receiver) - throws Exception { - if (!file.getMetadata().isReadSeekEfficient()) { - ResourceId filename = file.getMetadata().resourceId(); - throw new RuntimeException(String.format("File has to be seekable: %s", filename)); - } - - SeekableByteChannel seekableByteChannel = file.openSeekable(); - - AvroParquetReader.Builder builder = - (AvroParquetReader.Builder) - AvroParquetReader.builder( - new BeamParquetInputFile(seekableByteChannel)) - .withConf(SerializableConfiguration.newConfiguration(configuration)); - if (modelClass != null) { - // all GenericData implementations have a static get method - builder = builder.withDataModel(buildModelObject(modelClass)); - } - - try (ParquetReader reader = builder.build()) { - GenericRecord read; - while ((read = reader.read()) != null) { - receiver.output(parseFn.apply(read)); - } - } - - seekableByteChannel.close(); - } - } - private static class BeamParquetInputFile implements InputFile { private final SeekableByteChannel seekableByteChannel; diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index 5576be2a59f0..6dd67e3e511c 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -184,7 +184,6 @@ public void testWriteAndRead() { mainPipeline.run().waitUntilFinish(); ParquetIO.Read read = ParquetIO.read(SCHEMA); - assertTrue(read.isSplittable()); PCollection readBack = readPipeline.apply(read.from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); @@ -210,27 +209,6 @@ public void testWriteWithRowGroupSizeAndRead() { readPipeline.run().waitUntilFinish(); } - @Test - public void testWriteAndReadWithoutSplit() { - List records = generateGenericRecords(1000); - - mainPipeline - .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) - .apply( - FileIO.write() - .via(ParquetIO.sink(SCHEMA)) - .to(temporaryFolder.getRoot().getAbsolutePath())); - mainPipeline.run().waitUntilFinish(); - - PCollection readBackWithSplit = - readPipeline.apply( - ParquetIO.read(SCHEMA) - .from(temporaryFolder.getRoot().getAbsolutePath() + "/*") - .withoutSplit()); - PAssert.that(readBackWithSplit).containsInAnyOrder(records); - readPipeline.run().waitUntilFinish(); - } - @Test public void testWriteAndReadWithBeamSchema() { List records = generateGenericRecords(1000); @@ -255,7 +233,7 @@ public void testWriteAndReadWithBeamSchema() { } @Test - public void testWriteAndReadFilesAsJsonForWithSplitForUnknownSchema() { + public void testWriteAndReadFilesAsJsonForUnknownSchema() { List records = generateGenericRecords(1000); mainPipeline @@ -266,13 +244,12 @@ public void testWriteAndReadFilesAsJsonForWithSplitForUnknownSchema() { .to(temporaryFolder.getRoot().getAbsolutePath())); mainPipeline.run().waitUntilFinish(); - PCollection readBackAsJsonWithSplit = + PCollection readBackAsJson = readPipeline.apply( ParquetIO.parseGenericRecords(ParseGenericRecordAsJsonFn.create()) - .from(temporaryFolder.getRoot().getAbsolutePath() + "/*") - .withSplit()); + .from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); - PAssert.that(readBackAsJsonWithSplit).containsInAnyOrder(convertRecordsToJson(records)); + PAssert.that(readBackAsJson).containsInAnyOrder(convertRecordsToJson(records)); readPipeline.run().waitUntilFinish(); } @@ -281,7 +258,6 @@ public void testWriteAndReadFiles() { List records = generateGenericRecords(1000); ParquetIO.ReadFiles readFiles = ParquetIO.readFiles(SCHEMA); - assertTrue(readFiles.isSplittable()); PCollection writeThenRead = mainPipeline @@ -308,7 +284,6 @@ public void testReadFilesAsJsonForUnknownSchemaFiles() { ParquetIO.ParseFiles parseFiles = ParquetIO.parseFilesGenericRecords(ParseGenericRecordAsJsonFn.create()); - assertTrue(parseFiles.isSplittable()); PCollection writeThenRead = mainPipeline @@ -401,7 +376,6 @@ public void testReadDisplayData() { DisplayData.from( ParquetIO.read(SCHEMA) .from("foo.parquet") - .withSplit() .withProjection(REQUESTED_SCHEMA, SCHEMA) .withAvroDataModel(GenericData.get()) .withConfiguration(configuration)); @@ -409,7 +383,6 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("filePattern", "foo.parquet")); assertThat(displayData, hasDisplayItem("schema", SCHEMA.toString())); assertThat(displayData, hasDisplayItem("inferBeamSchema", false)); - assertThat(displayData, hasDisplayItem("splittable", true)); assertThat(displayData, hasDisplayItem("projectionSchema", REQUESTED_SCHEMA.toString())); assertThat(displayData, hasDisplayItem("avroDataModel", GenericData.get().toString())); assertThat(displayData, hasDisplayItem("parquet.foo", "foo")); @@ -445,29 +418,6 @@ public void testWriteAndReadUsingReflectDataSchemaWithoutDataModelThrowsExceptio readPipeline.run().waitUntilFinish(); } - @Test(expected = org.apache.beam.sdk.Pipeline.PipelineExecutionException.class) - public void testWriteAndReadWithSplitUsingReflectDataSchemaWithoutDataModelThrowsException() { - Schema testRecordSchema = ReflectData.get().getSchema(TestRecord.class); - - List records = generateGenericRecords(1000); - mainPipeline - .apply(Create.of(records).withCoder(AvroCoder.of(testRecordSchema))) - .apply( - FileIO.write() - .via(ParquetIO.sink(testRecordSchema)) - .to(temporaryFolder.getRoot().getAbsolutePath())); - mainPipeline.run().waitUntilFinish(); - - PCollection readBack = - readPipeline.apply( - ParquetIO.read(testRecordSchema) - .withSplit() - .from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); - - PAssert.that(readBack).containsInAnyOrder(records); - readPipeline.run().waitUntilFinish(); - } - @Test public void testWriteAndReadUsingReflectDataSchemaWithDataModel() { Schema testRecordSchema = ReflectData.get().getSchema(TestRecord.class); @@ -491,30 +441,6 @@ public void testWriteAndReadUsingReflectDataSchemaWithDataModel() { readPipeline.run().waitUntilFinish(); } - @Test - public void testWriteAndReadWithSplitUsingReflectDataSchemaWithDataModel() { - Schema testRecordSchema = ReflectData.get().getSchema(TestRecord.class); - - List records = generateGenericRecords(1000); - mainPipeline - .apply(Create.of(records).withCoder(AvroCoder.of(testRecordSchema))) - .apply( - FileIO.write() - .via(ParquetIO.sink(testRecordSchema)) - .to(temporaryFolder.getRoot().getAbsolutePath())); - mainPipeline.run().waitUntilFinish(); - - PCollection readBack = - readPipeline.apply( - ParquetIO.read(testRecordSchema) - .withSplit() - .withAvroDataModel(GenericData.get()) - .from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); - - PAssert.that(readBack).containsInAnyOrder(records); - readPipeline.run().waitUntilFinish(); - } - @Test public void testWriteAndReadUsingGenericDataSchemaWithDataModel() { Schema schema = new Schema.Parser().parse(SCHEMA_STRING); @@ -538,30 +464,6 @@ public void testWriteAndReadUsingGenericDataSchemaWithDataModel() { readPipeline.run().waitUntilFinish(); } - @Test - public void testWriteAndReadwithSplitUsingGenericDataSchemaWithDataModel() { - Schema schema = new Schema.Parser().parse(SCHEMA_STRING); - - List records = generateGenericRecords(1000); - mainPipeline - .apply(Create.of(records).withCoder(AvroCoder.of(schema))) - .apply( - FileIO.write() - .via(ParquetIO.sink(schema).withAvroDataModel(GenericData.get())) - .to(temporaryFolder.getRoot().getAbsolutePath())); - mainPipeline.run().waitUntilFinish(); - - PCollection readBack = - readPipeline.apply( - ParquetIO.read(schema) - .withSplit() - .withAvroDataModel(GenericData.get()) - .from(temporaryFolder.getRoot().getAbsolutePath() + "/*")); - - PAssert.that(readBack).containsInAnyOrder(records); - readPipeline.run().waitUntilFinish(); - } - @Test public void testWriteAndReadWithConfiguration() { List records = generateGenericRecords(10); @@ -583,8 +485,7 @@ public void testWriteAndReadWithConfiguration() { readPipeline.apply( ParquetIO.read(SCHEMA) .from(temporaryFolder.getRoot().getAbsolutePath() + "/*") - .withConfiguration(configuration) - .withSplit()); + .withConfiguration(configuration)); PAssert.that(readBack).containsInAnyOrder(expectedRecords); readPipeline.run().waitUntilFinish(); } diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java index cf3c7433f08e..cd337e87d876 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java @@ -177,7 +177,6 @@ private static PCollection getTableParquet( "Read " + tableName + " (parquet)", ParquetIO.read(schema) .from(filepattern) - .withSplit() .withProjection(schemaProjected, schemaProjected) .withBeamSchemas(true)); }