From f3a5b25681b3ac59e9d254799d629147572e32d1 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 27 Sep 2024 18:09:55 -0400 Subject: [PATCH 1/2] [yaml] Preserve windowing for unbounded input when using WriteToJson Java provider Signed-off-by: Jeffrey Kinard --- .../csv/providers/CsvWriteTransformProvider.java | 14 ++++++++++---- .../json/providers/JsonWriteTransformProvider.java | 10 ++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java index f4d54c408cf4..87e51afbcbfd 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; @@ -134,10 +135,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (configuration.getDelimiter() != null) { format = format.withDelimiter(configuration.getDelimiter().charAt(0)); } - WriteFilesResult result = - input - .get(INPUT_ROWS_TAG) - .apply(CsvIO.writeRows(configuration.getPath(), format).withSuffix("")); + + // Preserve input windowing + CsvIO.Write writeTransform = + CsvIO.writeRows(configuration.getPath(), format).withSuffix(""); + if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) { + writeTransform = writeTransform.withWindowedWrites(); + } + + WriteFilesResult result = input.get(INPUT_ROWS_TAG).apply(writeTransform); Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING)); return PCollectionRowTuple.of( WRITE_RESULTS, diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java index 9e030821e5ca..de7095f382a1 100644 --- a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; @@ -121,8 +122,13 @@ protected static class JsonWriteTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - WriteFilesResult result = - input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix("")); + // Preserve input windowing + JsonIO.Write writeTransform = JsonIO.writeRows(configuration.getPath()).withSuffix(""); + if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) { + writeTransform = writeTransform.withWindowedWrites(); + } + + WriteFilesResult result = input.get(INPUT_ROWS_TAG).apply(writeTransform); Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING)); return PCollectionRowTuple.of( WRITE_RESULTS, From 821793ede77379491af6e071e71ac18612cbca3a Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Mon, 30 Sep 2024 17:08:44 -0400 Subject: [PATCH 2/2] preserve non-global windowing Signed-off-by: Jeffrey Kinard --- .../sdk/io/csv/providers/CsvWriteTransformProvider.java | 7 +++++-- .../sdk/io/json/providers/JsonWriteTransformProvider.java | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java index 87e51afbcbfd..89e8211026b0 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java @@ -36,10 +36,10 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.commons.csv.CSVFormat; @@ -139,7 +139,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { // Preserve input windowing CsvIO.Write writeTransform = CsvIO.writeRows(configuration.getPath(), format).withSuffix(""); - if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) { + if (!input + .get(INPUT_ROWS_TAG) + .getWindowingStrategy() + .equals(WindowingStrategy.globalDefault())) { writeTransform = writeTransform.withWindowedWrites(); } diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java index de7095f382a1..a522d176fac5 100644 --- a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java @@ -35,10 +35,10 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; /** @@ -124,7 +124,10 @@ protected static class JsonWriteTransform extends SchemaTransform { public PCollectionRowTuple expand(PCollectionRowTuple input) { // Preserve input windowing JsonIO.Write writeTransform = JsonIO.writeRows(configuration.getPath()).withSuffix(""); - if (input.get(INPUT_ROWS_TAG).isBounded() == PCollection.IsBounded.UNBOUNDED) { + if (!input + .get(INPUT_ROWS_TAG) + .getWindowingStrategy() + .equals(WindowingStrategy.globalDefault())) { writeTransform = writeTransform.withWindowedWrites(); }