From 4cca79102a127bdda36f68892605354c93841b91 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 29 Aug 2024 15:18:11 -0400 Subject: [PATCH 1/8] RowFilter --- .../org/apache/beam/sdk/util/RowFilter.java | 347 ++++++++++++++++++ .../apache/beam/sdk/util/RowFilterTest.java | 333 +++++++++++++++++ 2 files changed, 680 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java new file mode 100644 index 000000000000..a36f79c8345a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate + * what fields you would like to either keep or drop. Afterward, + * call {@link #filter(Row)} on a Schema-compatible Row to filter it. An un-configured filter will + * simply return the input row untouched. + * + *

Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). + * + *

A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link + * Schema}. You can access this new Schema ahead of time via the filter's {@link #outputSchema()}. + * + *

Configure a {@link RowFilter} as follows: + * + *

{@code
+ * // this is an un-configured filter
+ * RowFilter unconfigured = new RowFilter(beamSchema);
+ *
+ * List fields = Arrays.asList("foo", "bar.xyz", "baz.abc.qwe");
+ *
+ * // this filter will exclusively keep these fields and drop everything else
+ * RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields);
+ *
+ * // this filter will drop these fields
+ * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields);
+ *
+ * // produces a filtered row
+ * Row outputRow = keepingFilter.filter(row);
+ * }
+ * + * Check the documentation for {@link #keeping(List)} and {@link #dropping(List)} for further + * details on what a filtered Row can look like. + */ +public class RowFilter implements Serializable { + private final Schema rowSchema; + private @Nullable Schema transformedSchema; + + public RowFilter(Schema rowSchema) { + this.rowSchema = rowSchema; + } + + /** + * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed + * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a + * nested field could not be reached. + */ + @VisibleForTesting + static void validateSchemaContainsFields( + Schema schema, List specifiedFields, String operation) { + Set notFound = new HashSet<>(); + Set notRowField = new HashSet<>(); + + for (String field : specifiedFields) { + List levels = Splitter.on(".").splitToList(field); + + Schema currentSchema = schema; + + for (int i = 0; i < levels.size(); i++) { + String currentFieldName = String.join(".", levels.subList(0, i + 1)); + + if (!currentSchema.hasField(levels.get(i))) { + notFound.add(currentFieldName); + break; + } + + if (i + 1 < levels.size()) { + Schema.Field nextField = currentSchema.getField(levels.get(i)); + if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) { + notRowField.add(currentFieldName); + break; + } + currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema()); + } + } + } + + if (!notFound.isEmpty() || !notRowField.isEmpty()) { + String message = "Validation failed for " + operation + "."; + if (!notFound.isEmpty()) { + message += "\nRow Schema does not contain the following specified fields: " + notFound; + } + if (!notRowField.isEmpty()) { + message += + "\nThe following specified fields are not of type Row. Their nested fields could not be reached: " + + notRowField; + } + throw new IllegalArgumentException(message); + } + } + + /** + * Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields. + * Nested fields can be specified using dot-notation. + * + *

For example, if we want to keep the list of fields {@code ["foo", "baz.nested_1"]}, for the + * input {@link Row}: + * + *

{@code
+     * foo: 123
+     * bar: 456
+     * baz:
+     *   nested_1: abc
+     *   nested_2: xyz
+     * }
+ * + * we will get the following output {@link Row}: + * + *
{@code
+     * foo: 123
+     * baz
+     *   nested_1: abc
+     * }
+ */ + public RowFilter keeping(List fields) { + Preconditions.checkState( + transformedSchema == null, + "This RowFilter has already been configured to filter to the following Schema: %s", + transformedSchema); + validateSchemaContainsFields(rowSchema, fields, "\"keep\""); + transformedSchema = keepFields(rowSchema, fields); + return this; + } + + /** + * Configures this {@link RowFilter} to filter {@link Row} by removing the specified fields. + * Nested fields can be specified using dot-notation. + * + *

For example, if we want to drop the list of fields {@code ["foo", "baz.nested_1"]}, for this + * input {@link Row}: + * + *

{@code
+     * foo: 123
+     * bar: 456
+     * baz:
+     *   nested_1: abc
+     *   nested_2: xyz
+     * }
+ * + * we will get the following output {@link Row}: + * + *
{@code
+     * bar: 456
+     * baz:
+     *   nested_2: xyz
+     * }
+ */ + public RowFilter dropping(List fields) { + Preconditions.checkState( + transformedSchema == null, + "This RowFilter has already been configured to filter to the following Schema: %s", + transformedSchema); + validateSchemaContainsFields(rowSchema, fields, "\"drop\""); + transformedSchema = dropFields(rowSchema, fields); + return this; + } + + /** + * Creates a field tree, separating each top-level field from its (potential) nested fields. E.g. + * ["foo.bar.baz", "foo.abc", "xyz"] --> {"foo": ["bar.baz", "abc"], "xyz": []} + */ + @VisibleForTesting + static Map> getFieldTree(List fields) { + Map> fieldTree = Maps.newHashMap(); + + for (String field : fields) { + List components = Splitter.on(".").splitToList(field); + String root = components.get(0); + fieldTree.computeIfAbsent(root, r -> new ArrayList<>()); + + if (components.size() > 1) { + String nestedFields = String.join(".", components.subList(1, components.size())); + Preconditions.checkNotNull(fieldTree.get(root)).add(nestedFields); + } + } + return fieldTree; + } + + /** + * Returns a new {@link Row} containing only the fields that intersect with the new {@link Schema} + * Relies on a previous step to have validated the compatibility of the new {@link Schema}. + */ + @VisibleForTesting + @Nullable + static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) { + if (row == null) { + return null; + } + Map values = new HashMap<>(newSchema.getFieldCount()); + + for (Schema.Field field : newSchema.getFields()) { + String name = field.getName(); + Object value = row.getValue(name); + if (field.getType().getTypeName().equals(Schema.TypeName.ROW)) { + Schema nestedRowSchema = Preconditions.checkNotNull(field.getType().getRowSchema()); + value = copyWithNewSchema(row.getRow(name), nestedRowSchema); + } + if (value != null) { + values.put(name, value); + } + } + return Row.withSchema(newSchema).withFieldValues(values).build(); + } + + /** + * Returns a new {@link Schema} with the specified fields removed. + * + *

No guarantee that field ordering will remain the same. + */ + @VisibleForTesting + static Schema dropFields(Schema schema, List fieldsToDrop) { + if (fieldsToDrop.isEmpty()) { + return schema; + } + List newFieldsList = new ArrayList<>(schema.getFields()); + Map> fieldTree = getFieldTree(fieldsToDrop); + + for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) { + String root = fieldAndDescendents.getKey(); + List nestedFields = fieldAndDescendents.getValue(); + Schema.Field fieldToRemove = schema.getField(root); + Schema.FieldType typeToRemove = fieldToRemove.getType(); + + // Base case: we're at the specified field to remove. + if (nestedFields.isEmpty()) { + newFieldsList.remove(fieldToRemove); + } else { + // Otherwise, we're asked to remove a nested field. Verify current field is ROW type + Preconditions.checkArgument( + typeToRemove.getTypeName().equals(Schema.TypeName.ROW), + "Expected type %s for specified nested field '%s', but instead got type %s.", + Schema.TypeName.ROW, + root, + typeToRemove.getTypeName()); + + Schema nestedSchema = Preconditions.checkNotNull(typeToRemove.getRowSchema()); + Schema newNestedSchema = dropFields(nestedSchema, nestedFields); + Schema.Field modifiedField = + Schema.Field.of(root, Schema.FieldType.row(newNestedSchema)) + .withNullable(typeToRemove.getNullable()); + + // Replace with modified field + newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField); + } + } + return new Schema(newFieldsList); + } + + /** + * Returns a new {@link Schema} with only the specified fields kept. + * + *

No guarantee that field ordering will remain the same. + */ + @VisibleForTesting + static Schema keepFields(Schema schema, List fieldsToKeep) { + if (fieldsToKeep.isEmpty()) { + return schema; + } + List newFieldsList = new ArrayList<>(fieldsToKeep.size()); + Map> fieldTree = getFieldTree(fieldsToKeep); + + for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) { + String root = fieldAndDescendents.getKey(); + List nestedFields = fieldAndDescendents.getValue(); + Schema.Field fieldToKeep = schema.getField(root); + Schema.FieldType typeToKeep = fieldToKeep.getType(); + + // Base case: we're at the specified field to keep, and we can skip this conditional. + // Otherwise: we're asked to keep a nested field, so we dig deeper to determine which nested + // fields to keep + if (!nestedFields.isEmpty()) { + Preconditions.checkArgument( + typeToKeep.getTypeName().equals(Schema.TypeName.ROW), + "Expected type %s for specified nested field '%s', but instead got type %s.", + Schema.TypeName.ROW, + root, + typeToKeep.getTypeName()); + + Schema nestedSchema = Preconditions.checkNotNull(typeToKeep.getRowSchema()); + Schema newNestedSchema = keepFields(nestedSchema, nestedFields); + fieldToKeep = + Schema.Field.of(root, Schema.FieldType.row(newNestedSchema)) + .withNullable(typeToKeep.getNullable()); + } + newFieldsList.add(fieldToKeep); + } + + return new Schema(newFieldsList); + } + + /** + * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already + * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this + * {@link RowFilter}. + * + *

If not yet configured, will simply return the same {@link Row}. + */ + public Row filter(Row row) { + if (transformedSchema == null) { + return row; + } + Preconditions.checkState( + row.getSchema().assignableTo(rowSchema), + "Encountered Row with schema that is incompatible with this RowFilter's schema.\nRow schema: %s\nSchema used to initialize this RowFilter: %s", + row.getSchema(), + rowSchema); + + return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); + } + + /** Returns the output {@link Row}'s {@link Schema}. */ + public Schema outputSchema() { + return transformedSchema != null ? transformedSchema : rowSchema; + } +} \ No newline at end of file diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java new file mode 100644 index 000000000000..5ea55218c46c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** Tests for {@link RowFilter}. */ +public class RowFilterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final Schema DOUBLY_NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("doubly_nested_str") + .addInt32Field("doubly_nested_int") + .build(); + + private static final Schema NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("nested_str") + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build(); + private static final Schema ROW_SCHEMA = + Schema.builder() + .addStringField("str") + .addBooleanField("bool") + .addNullableInt32Field("nullable_int") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addRowField("row", NESTED_ROW_SCHEMA) + .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .build(); + + @Test + public void testSchemaValidation() { + List> goodFields = + Arrays.asList( + Arrays.asList("str", "bool", "nullable_row"), + Arrays.asList("nullable_int", "arr_int"), + Arrays.asList("row.nested_str", "row.nested_row.doubly_nested_str"), + Arrays.asList("nullable_row.nested_row.doubly_nested_int")); + + for (List fields : goodFields) { + RowFilter.validateSchemaContainsFields(ROW_SCHEMA, fields, "test-operation"); + } + } + + @Test + public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() { + List, List>> nonExistentFields = + Arrays.asList( + KV.of( + Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3"), + Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3")), + KV.of( + Arrays.asList("nullable_int", "arr_int", "nonexistent"), + Collections.singletonList("nonexistent")), + KV.of( + Arrays.asList( + "nullable_row.nested_row.nonexistent", "row.nonexistent", "row.nested_float"), + Arrays.asList("nullable_row.nested_row.nonexistent", "row.nonexistent"))); + + for (KV, List> fields : nonExistentFields) { + List allFields = fields.getKey(); + List badFields = fields.getValue(); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); + + assertThat(e.getMessage(), containsString("Validation failed for test-operation")); + assertThat( + e.getMessage(), + containsString("Row Schema does not contain the following specified fields")); + for (String badField : badFields) { + assertThat(e.getMessage(), containsString(badField)); + } + } + } + + @Test + public void testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() { + List, List>> nonNestedFields = + Arrays.asList( + KV.of( + Arrays.asList( + "row.nested_row", "row.nested_int", "row.nested_str.unexpected_nested"), + Collections.singletonList("row.nested_str")), + KV.of( + Arrays.asList( + "nullable_row.nested_str", + "nullable_row.nested_str.unexpected", + "row.nested_int.unexpected_2"), + Arrays.asList("nullable_row.nested_str", "row.nested_int"))); + + for (KV, List> fields : nonNestedFields) { + List allFields = fields.getKey(); + List badFields = fields.getValue(); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); + + assertThat(e.getMessage(), containsString("Validation failed for test-operation")); + assertThat( + e.getMessage(), + containsString( + "The following specified fields are not of type Row. Their nested fields could not be reached")); + for (String badField : badFields) { + assertThat(e.getMessage(), containsString(badField)); + } + } + } + + @Test + public void testGetFieldTree() { + List fields = + Arrays.asList( + "top-level", + "top-level-2", + "top-level.nested-level", + "top-level.nested-level-2", + "top-level.nested-level.doubly-nested-level", + "top-level.nested-level.doubly-nested-level-2"); + List nestedLayer = + Arrays.asList( + "nested-level", + "nested-level-2", + "nested-level.doubly-nested-level", + "nested-level.doubly-nested-level-2"); + + Map> expectedTree = + ImmutableMap.>builder() + .put("top-level-2", Collections.emptyList()) + .put("top-level", nestedLayer) + .build(); + + assertEquals(expectedTree, RowFilter.getFieldTree(fields)); + + List doublyNestedLayer = Arrays.asList("doubly-nested-level", "doubly-nested-level-2"); + + Map> expectedNestedTree = + ImmutableMap.>builder() + .put("nested-level-2", Collections.emptyList()) + .put("nested-level", doublyNestedLayer) + .build(); + + assertEquals(expectedNestedTree, RowFilter.getFieldTree(nestedLayer)); + } + + @Test + public void testDropSchemaFields() { + List fieldsToDrop = + Arrays.asList( + "str", + "arr_int", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row.nested_str", + "nullable_row.nested_row"); + + Schema expectedDroppedSchema = + Schema.builder() + .addBooleanField("bool") + .addRowField( + "row", + Schema.builder() + .addStringField("nested_str") + .addRowField( + "nested_row", Schema.builder().addStringField("doubly_nested_str").build()) + .build()) + .addNullableRowField( + "nullable_row", + Schema.builder().addInt32Field("nested_int").addFloatField("nested_float").build()) + .build(); + + assertTrue(expectedDroppedSchema.equivalent(RowFilter.dropFields(ROW_SCHEMA, fieldsToDrop))); + } + + @Test + public void testKeepSchemaFields() { + List fieldsToKeep = + Arrays.asList( + "str", + "arr_int", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row.nested_str", + "nullable_row.nested_row"); + + Schema expectedKeptSchema = + Schema.builder() + .addStringField("str") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addNullableInt32Field("nullable_int") + .addRowField( + "row", + Schema.builder() + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addRowField( + "nested_row", Schema.builder().addInt32Field("doubly_nested_int").build()) + .build()) + .addNullableRowField( + "nullable_row", + Schema.builder() + .addStringField("nested_str") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build()) + .build(); + + assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep))); + } + + private static final Row ORIGINAL_ROW = + Row.withSchema(ROW_SCHEMA) + .addValue("str_value") + .addValue(true) + .addValue(123) + .addValue(Arrays.asList(1, 2, 3, 4, 5)) + .addValue( + Row.withSchema(NESTED_ROW_SCHEMA) + .addValue("nested_str_value") + .addValue(456) + .addValue(1.234f) + .addValue( + Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) + .addValue("doubly_nested_str_value") + .addValue(789) + .build()) + .build()) + .addValue(null) + .build(); + + private static final Schema FILTERED_DOUBLY_NESTED_SCHEMA = + Schema.builder().addStringField("doubly_nested_str").build(); + private static final Schema FILTERED_NESTED_SCHEMA = + Schema.builder() + .addStringField("nested_str") + .addRowField("nested_row", FILTERED_DOUBLY_NESTED_SCHEMA) + .build(); + private static final Schema FILTERED_SCHEMA = + Schema.builder() + .addStringField("str") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addRowField("row", FILTERED_NESTED_SCHEMA) + .build(); + + private static final Row FILTERED_ROW = + Row.withSchema(FILTERED_SCHEMA) + .addValue("str_value") + .addValue(Arrays.asList(1, 2, 3, 4, 5)) + .addValue( + Row.withSchema(FILTERED_NESTED_SCHEMA) + .addValue("nested_str_value") + .addValue( + Row.withSchema(FILTERED_DOUBLY_NESTED_SCHEMA) + .addValue("doubly_nested_str_value") + .build()) + .build()) + .build(); + + @Test + public void testCopyRowWithNewSchema() { + assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, FILTERED_SCHEMA)); + } + + @Test + public void testDropRowFields() { + RowFilter rowFilter = + new RowFilter(ROW_SCHEMA) + .dropping( + Arrays.asList( + "bool", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row")); + + assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); + } + + @Test + public void testKeepRowFields() { + RowFilter rowFilter = + new RowFilter(ROW_SCHEMA) + .keeping( + Arrays.asList( + "str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str")); + + assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); + } +} \ No newline at end of file From 1027bdb8ef63a401315af5e373227d52ed7c4af6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 29 Aug 2024 15:49:12 -0400 Subject: [PATCH 2/8] spotless --- .../org/apache/beam/sdk/util/RowFilter.java | 490 +++++++-------- .../apache/beam/sdk/util/RowFilterTest.java | 566 +++++++++--------- 2 files changed, 528 insertions(+), 528 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index a36f79c8345a..d53c68d87c8b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -65,283 +65,283 @@ * details on what a filtered Row can look like. */ public class RowFilter implements Serializable { - private final Schema rowSchema; - private @Nullable Schema transformedSchema; + private final Schema rowSchema; + private @Nullable Schema transformedSchema; - public RowFilter(Schema rowSchema) { - this.rowSchema = rowSchema; - } - - /** - * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed - * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a - * nested field could not be reached. - */ - @VisibleForTesting - static void validateSchemaContainsFields( - Schema schema, List specifiedFields, String operation) { - Set notFound = new HashSet<>(); - Set notRowField = new HashSet<>(); + public RowFilter(Schema rowSchema) { + this.rowSchema = rowSchema; + } - for (String field : specifiedFields) { - List levels = Splitter.on(".").splitToList(field); + /** + * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed + * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a + * nested field could not be reached. + */ + @VisibleForTesting + static void validateSchemaContainsFields( + Schema schema, List specifiedFields, String operation) { + Set notFound = new HashSet<>(); + Set notRowField = new HashSet<>(); - Schema currentSchema = schema; + for (String field : specifiedFields) { + List levels = Splitter.on(".").splitToList(field); - for (int i = 0; i < levels.size(); i++) { - String currentFieldName = String.join(".", levels.subList(0, i + 1)); + Schema currentSchema = schema; - if (!currentSchema.hasField(levels.get(i))) { - notFound.add(currentFieldName); - break; - } + for (int i = 0; i < levels.size(); i++) { + String currentFieldName = String.join(".", levels.subList(0, i + 1)); - if (i + 1 < levels.size()) { - Schema.Field nextField = currentSchema.getField(levels.get(i)); - if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) { - notRowField.add(currentFieldName); - break; - } - currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema()); - } - } + if (!currentSchema.hasField(levels.get(i))) { + notFound.add(currentFieldName); + break; } - if (!notFound.isEmpty() || !notRowField.isEmpty()) { - String message = "Validation failed for " + operation + "."; - if (!notFound.isEmpty()) { - message += "\nRow Schema does not contain the following specified fields: " + notFound; - } - if (!notRowField.isEmpty()) { - message += - "\nThe following specified fields are not of type Row. Their nested fields could not be reached: " - + notRowField; - } - throw new IllegalArgumentException(message); + if (i + 1 < levels.size()) { + Schema.Field nextField = currentSchema.getField(levels.get(i)); + if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) { + notRowField.add(currentFieldName); + break; + } + currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema()); } + } } - /** - * Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields. - * Nested fields can be specified using dot-notation. - * - *

For example, if we want to keep the list of fields {@code ["foo", "baz.nested_1"]}, for the - * input {@link Row}: - * - *

{@code
-     * foo: 123
-     * bar: 456
-     * baz:
-     *   nested_1: abc
-     *   nested_2: xyz
-     * }
- * - * we will get the following output {@link Row}: - * - *
{@code
-     * foo: 123
-     * baz
-     *   nested_1: abc
-     * }
- */ - public RowFilter keeping(List fields) { - Preconditions.checkState( - transformedSchema == null, - "This RowFilter has already been configured to filter to the following Schema: %s", - transformedSchema); - validateSchemaContainsFields(rowSchema, fields, "\"keep\""); - transformedSchema = keepFields(rowSchema, fields); - return this; + if (!notFound.isEmpty() || !notRowField.isEmpty()) { + String message = "Validation failed for " + operation + "."; + if (!notFound.isEmpty()) { + message += "\nRow Schema does not contain the following specified fields: " + notFound; + } + if (!notRowField.isEmpty()) { + message += + "\nThe following specified fields are not of type Row. Their nested fields could not be reached: " + + notRowField; + } + throw new IllegalArgumentException(message); } + } - /** - * Configures this {@link RowFilter} to filter {@link Row} by removing the specified fields. - * Nested fields can be specified using dot-notation. - * - *

For example, if we want to drop the list of fields {@code ["foo", "baz.nested_1"]}, for this - * input {@link Row}: - * - *

{@code
-     * foo: 123
-     * bar: 456
-     * baz:
-     *   nested_1: abc
-     *   nested_2: xyz
-     * }
- * - * we will get the following output {@link Row}: - * - *
{@code
-     * bar: 456
-     * baz:
-     *   nested_2: xyz
-     * }
- */ - public RowFilter dropping(List fields) { - Preconditions.checkState( - transformedSchema == null, - "This RowFilter has already been configured to filter to the following Schema: %s", - transformedSchema); - validateSchemaContainsFields(rowSchema, fields, "\"drop\""); - transformedSchema = dropFields(rowSchema, fields); - return this; - } + /** + * Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields. + * Nested fields can be specified using dot-notation. + * + *

For example, if we want to keep the list of fields {@code ["foo", "baz.nested_1"]}, for the + * input {@link Row}: + * + *

{@code
+   * foo: 123
+   * bar: 456
+   * baz:
+   *   nested_1: abc
+   *   nested_2: xyz
+   * }
+ * + * we will get the following output {@link Row}: + * + *
{@code
+   * foo: 123
+   * baz
+   *   nested_1: abc
+   * }
+ */ + public RowFilter keeping(List fields) { + Preconditions.checkState( + transformedSchema == null, + "This RowFilter has already been configured to filter to the following Schema: %s", + transformedSchema); + validateSchemaContainsFields(rowSchema, fields, "\"keep\""); + transformedSchema = keepFields(rowSchema, fields); + return this; + } - /** - * Creates a field tree, separating each top-level field from its (potential) nested fields. E.g. - * ["foo.bar.baz", "foo.abc", "xyz"] --> {"foo": ["bar.baz", "abc"], "xyz": []} - */ - @VisibleForTesting - static Map> getFieldTree(List fields) { - Map> fieldTree = Maps.newHashMap(); + /** + * Configures this {@link RowFilter} to filter {@link Row} by removing the specified fields. + * Nested fields can be specified using dot-notation. + * + *

For example, if we want to drop the list of fields {@code ["foo", "baz.nested_1"]}, for this + * input {@link Row}: + * + *

{@code
+   * foo: 123
+   * bar: 456
+   * baz:
+   *   nested_1: abc
+   *   nested_2: xyz
+   * }
+ * + * we will get the following output {@link Row}: + * + *
{@code
+   * bar: 456
+   * baz:
+   *   nested_2: xyz
+   * }
+ */ + public RowFilter dropping(List fields) { + Preconditions.checkState( + transformedSchema == null, + "This RowFilter has already been configured to filter to the following Schema: %s", + transformedSchema); + validateSchemaContainsFields(rowSchema, fields, "\"drop\""); + transformedSchema = dropFields(rowSchema, fields); + return this; + } - for (String field : fields) { - List components = Splitter.on(".").splitToList(field); - String root = components.get(0); - fieldTree.computeIfAbsent(root, r -> new ArrayList<>()); + /** + * Creates a field tree, separating each top-level field from its (potential) nested fields. E.g. + * ["foo.bar.baz", "foo.abc", "xyz"] --> {"foo": ["bar.baz", "abc"], "xyz": []} + */ + @VisibleForTesting + static Map> getFieldTree(List fields) { + Map> fieldTree = Maps.newHashMap(); - if (components.size() > 1) { - String nestedFields = String.join(".", components.subList(1, components.size())); - Preconditions.checkNotNull(fieldTree.get(root)).add(nestedFields); - } - } - return fieldTree; + for (String field : fields) { + List components = Splitter.on(".").splitToList(field); + String root = components.get(0); + fieldTree.computeIfAbsent(root, r -> new ArrayList<>()); + + if (components.size() > 1) { + String nestedFields = String.join(".", components.subList(1, components.size())); + Preconditions.checkNotNull(fieldTree.get(root)).add(nestedFields); + } } + return fieldTree; + } - /** - * Returns a new {@link Row} containing only the fields that intersect with the new {@link Schema} - * Relies on a previous step to have validated the compatibility of the new {@link Schema}. - */ - @VisibleForTesting - @Nullable - static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) { - if (row == null) { - return null; - } - Map values = new HashMap<>(newSchema.getFieldCount()); + /** + * Returns a new {@link Row} containing only the fields that intersect with the new {@link Schema} + * Relies on a previous step to have validated the compatibility of the new {@link Schema}. + */ + @VisibleForTesting + @Nullable + static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) { + if (row == null) { + return null; + } + Map values = new HashMap<>(newSchema.getFieldCount()); - for (Schema.Field field : newSchema.getFields()) { - String name = field.getName(); - Object value = row.getValue(name); - if (field.getType().getTypeName().equals(Schema.TypeName.ROW)) { - Schema nestedRowSchema = Preconditions.checkNotNull(field.getType().getRowSchema()); - value = copyWithNewSchema(row.getRow(name), nestedRowSchema); - } - if (value != null) { - values.put(name, value); - } - } - return Row.withSchema(newSchema).withFieldValues(values).build(); + for (Schema.Field field : newSchema.getFields()) { + String name = field.getName(); + Object value = row.getValue(name); + if (field.getType().getTypeName().equals(Schema.TypeName.ROW)) { + Schema nestedRowSchema = Preconditions.checkNotNull(field.getType().getRowSchema()); + value = copyWithNewSchema(row.getRow(name), nestedRowSchema); + } + if (value != null) { + values.put(name, value); + } } + return Row.withSchema(newSchema).withFieldValues(values).build(); + } - /** - * Returns a new {@link Schema} with the specified fields removed. - * - *

No guarantee that field ordering will remain the same. - */ - @VisibleForTesting - static Schema dropFields(Schema schema, List fieldsToDrop) { - if (fieldsToDrop.isEmpty()) { - return schema; - } - List newFieldsList = new ArrayList<>(schema.getFields()); - Map> fieldTree = getFieldTree(fieldsToDrop); + /** + * Returns a new {@link Schema} with the specified fields removed. + * + *

No guarantee that field ordering will remain the same. + */ + @VisibleForTesting + static Schema dropFields(Schema schema, List fieldsToDrop) { + if (fieldsToDrop.isEmpty()) { + return schema; + } + List newFieldsList = new ArrayList<>(schema.getFields()); + Map> fieldTree = getFieldTree(fieldsToDrop); - for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) { - String root = fieldAndDescendents.getKey(); - List nestedFields = fieldAndDescendents.getValue(); - Schema.Field fieldToRemove = schema.getField(root); - Schema.FieldType typeToRemove = fieldToRemove.getType(); + for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) { + String root = fieldAndDescendents.getKey(); + List nestedFields = fieldAndDescendents.getValue(); + Schema.Field fieldToRemove = schema.getField(root); + Schema.FieldType typeToRemove = fieldToRemove.getType(); - // Base case: we're at the specified field to remove. - if (nestedFields.isEmpty()) { - newFieldsList.remove(fieldToRemove); - } else { - // Otherwise, we're asked to remove a nested field. Verify current field is ROW type - Preconditions.checkArgument( - typeToRemove.getTypeName().equals(Schema.TypeName.ROW), - "Expected type %s for specified nested field '%s', but instead got type %s.", - Schema.TypeName.ROW, - root, - typeToRemove.getTypeName()); + // Base case: we're at the specified field to remove. + if (nestedFields.isEmpty()) { + newFieldsList.remove(fieldToRemove); + } else { + // Otherwise, we're asked to remove a nested field. Verify current field is ROW type + Preconditions.checkArgument( + typeToRemove.getTypeName().equals(Schema.TypeName.ROW), + "Expected type %s for specified nested field '%s', but instead got type %s.", + Schema.TypeName.ROW, + root, + typeToRemove.getTypeName()); - Schema nestedSchema = Preconditions.checkNotNull(typeToRemove.getRowSchema()); - Schema newNestedSchema = dropFields(nestedSchema, nestedFields); - Schema.Field modifiedField = - Schema.Field.of(root, Schema.FieldType.row(newNestedSchema)) - .withNullable(typeToRemove.getNullable()); + Schema nestedSchema = Preconditions.checkNotNull(typeToRemove.getRowSchema()); + Schema newNestedSchema = dropFields(nestedSchema, nestedFields); + Schema.Field modifiedField = + Schema.Field.of(root, Schema.FieldType.row(newNestedSchema)) + .withNullable(typeToRemove.getNullable()); - // Replace with modified field - newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField); - } - } - return new Schema(newFieldsList); + // Replace with modified field + newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField); + } } + return new Schema(newFieldsList); + } - /** - * Returns a new {@link Schema} with only the specified fields kept. - * - *

No guarantee that field ordering will remain the same. - */ - @VisibleForTesting - static Schema keepFields(Schema schema, List fieldsToKeep) { - if (fieldsToKeep.isEmpty()) { - return schema; - } - List newFieldsList = new ArrayList<>(fieldsToKeep.size()); - Map> fieldTree = getFieldTree(fieldsToKeep); + /** + * Returns a new {@link Schema} with only the specified fields kept. + * + *

No guarantee that field ordering will remain the same. + */ + @VisibleForTesting + static Schema keepFields(Schema schema, List fieldsToKeep) { + if (fieldsToKeep.isEmpty()) { + return schema; + } + List newFieldsList = new ArrayList<>(fieldsToKeep.size()); + Map> fieldTree = getFieldTree(fieldsToKeep); - for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) { - String root = fieldAndDescendents.getKey(); - List nestedFields = fieldAndDescendents.getValue(); - Schema.Field fieldToKeep = schema.getField(root); - Schema.FieldType typeToKeep = fieldToKeep.getType(); + for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) { + String root = fieldAndDescendents.getKey(); + List nestedFields = fieldAndDescendents.getValue(); + Schema.Field fieldToKeep = schema.getField(root); + Schema.FieldType typeToKeep = fieldToKeep.getType(); - // Base case: we're at the specified field to keep, and we can skip this conditional. - // Otherwise: we're asked to keep a nested field, so we dig deeper to determine which nested - // fields to keep - if (!nestedFields.isEmpty()) { - Preconditions.checkArgument( - typeToKeep.getTypeName().equals(Schema.TypeName.ROW), - "Expected type %s for specified nested field '%s', but instead got type %s.", - Schema.TypeName.ROW, - root, - typeToKeep.getTypeName()); + // Base case: we're at the specified field to keep, and we can skip this conditional. + // Otherwise: we're asked to keep a nested field, so we dig deeper to determine which nested + // fields to keep + if (!nestedFields.isEmpty()) { + Preconditions.checkArgument( + typeToKeep.getTypeName().equals(Schema.TypeName.ROW), + "Expected type %s for specified nested field '%s', but instead got type %s.", + Schema.TypeName.ROW, + root, + typeToKeep.getTypeName()); - Schema nestedSchema = Preconditions.checkNotNull(typeToKeep.getRowSchema()); - Schema newNestedSchema = keepFields(nestedSchema, nestedFields); - fieldToKeep = - Schema.Field.of(root, Schema.FieldType.row(newNestedSchema)) - .withNullable(typeToKeep.getNullable()); - } - newFieldsList.add(fieldToKeep); - } - - return new Schema(newFieldsList); + Schema nestedSchema = Preconditions.checkNotNull(typeToKeep.getRowSchema()); + Schema newNestedSchema = keepFields(nestedSchema, nestedFields); + fieldToKeep = + Schema.Field.of(root, Schema.FieldType.row(newNestedSchema)) + .withNullable(typeToKeep.getNullable()); + } + newFieldsList.add(fieldToKeep); } - /** - * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already - * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this - * {@link RowFilter}. - * - *

If not yet configured, will simply return the same {@link Row}. - */ - public Row filter(Row row) { - if (transformedSchema == null) { - return row; - } - Preconditions.checkState( - row.getSchema().assignableTo(rowSchema), - "Encountered Row with schema that is incompatible with this RowFilter's schema.\nRow schema: %s\nSchema used to initialize this RowFilter: %s", - row.getSchema(), - rowSchema); + return new Schema(newFieldsList); + } - return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); + /** + * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already + * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this + * {@link RowFilter}. + * + *

If not yet configured, will simply return the same {@link Row}. + */ + public Row filter(Row row) { + if (transformedSchema == null) { + return row; } + Preconditions.checkState( + row.getSchema().assignableTo(rowSchema), + "Encountered Row with schema that is incompatible with this RowFilter's schema.\nRow schema: %s\nSchema used to initialize this RowFilter: %s", + row.getSchema(), + rowSchema); - /** Returns the output {@link Row}'s {@link Schema}. */ - public Schema outputSchema() { - return transformedSchema != null ? transformedSchema : rowSchema; - } -} \ No newline at end of file + return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); + } + + /** Returns the output {@link Row}'s {@link Schema}. */ + public Schema outputSchema() { + return transformedSchema != null ? transformedSchema : rowSchema; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java index 5ea55218c46c..d1779f3d8465 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -37,297 +37,297 @@ /** Tests for {@link RowFilter}. */ public class RowFilterTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static final Schema DOUBLY_NESTED_ROW_SCHEMA = - Schema.builder() - .addStringField("doubly_nested_str") - .addInt32Field("doubly_nested_int") - .build(); - - private static final Schema NESTED_ROW_SCHEMA = - Schema.builder() - .addStringField("nested_str") - .addInt32Field("nested_int") - .addFloatField("nested_float") - .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) - .build(); - private static final Schema ROW_SCHEMA = - Schema.builder() - .addStringField("str") - .addBooleanField("bool") - .addNullableInt32Field("nullable_int") - .addArrayField("arr_int", Schema.FieldType.INT32) - .addRowField("row", NESTED_ROW_SCHEMA) - .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) - .build(); - - @Test - public void testSchemaValidation() { - List> goodFields = - Arrays.asList( - Arrays.asList("str", "bool", "nullable_row"), - Arrays.asList("nullable_int", "arr_int"), - Arrays.asList("row.nested_str", "row.nested_row.doubly_nested_str"), - Arrays.asList("nullable_row.nested_row.doubly_nested_int")); - - for (List fields : goodFields) { - RowFilter.validateSchemaContainsFields(ROW_SCHEMA, fields, "test-operation"); - } - } - - @Test - public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() { - List, List>> nonExistentFields = - Arrays.asList( - KV.of( - Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3"), - Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3")), - KV.of( - Arrays.asList("nullable_int", "arr_int", "nonexistent"), - Collections.singletonList("nonexistent")), - KV.of( - Arrays.asList( - "nullable_row.nested_row.nonexistent", "row.nonexistent", "row.nested_float"), - Arrays.asList("nullable_row.nested_row.nonexistent", "row.nonexistent"))); - - for (KV, List> fields : nonExistentFields) { - List allFields = fields.getKey(); - List badFields = fields.getValue(); - - IllegalArgumentException e = - assertThrows( - IllegalArgumentException.class, - () -> - RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); - - assertThat(e.getMessage(), containsString("Validation failed for test-operation")); - assertThat( - e.getMessage(), - containsString("Row Schema does not contain the following specified fields")); - for (String badField : badFields) { - assertThat(e.getMessage(), containsString(badField)); - } - } + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final Schema DOUBLY_NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("doubly_nested_str") + .addInt32Field("doubly_nested_int") + .build(); + + private static final Schema NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("nested_str") + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build(); + private static final Schema ROW_SCHEMA = + Schema.builder() + .addStringField("str") + .addBooleanField("bool") + .addNullableInt32Field("nullable_int") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addRowField("row", NESTED_ROW_SCHEMA) + .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .build(); + + @Test + public void testSchemaValidation() { + List> goodFields = + Arrays.asList( + Arrays.asList("str", "bool", "nullable_row"), + Arrays.asList("nullable_int", "arr_int"), + Arrays.asList("row.nested_str", "row.nested_row.doubly_nested_str"), + Arrays.asList("nullable_row.nested_row.doubly_nested_int")); + + for (List fields : goodFields) { + RowFilter.validateSchemaContainsFields(ROW_SCHEMA, fields, "test-operation"); } - - @Test - public void testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() { - List, List>> nonNestedFields = + } + + @Test + public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() { + List, List>> nonExistentFields = + Arrays.asList( + KV.of( + Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3"), + Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3")), + KV.of( + Arrays.asList("nullable_int", "arr_int", "nonexistent"), + Collections.singletonList("nonexistent")), + KV.of( Arrays.asList( - KV.of( - Arrays.asList( - "row.nested_row", "row.nested_int", "row.nested_str.unexpected_nested"), - Collections.singletonList("row.nested_str")), - KV.of( - Arrays.asList( - "nullable_row.nested_str", - "nullable_row.nested_str.unexpected", - "row.nested_int.unexpected_2"), - Arrays.asList("nullable_row.nested_str", "row.nested_int"))); - - for (KV, List> fields : nonNestedFields) { - List allFields = fields.getKey(); - List badFields = fields.getValue(); - - IllegalArgumentException e = - assertThrows( - IllegalArgumentException.class, - () -> - RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); - - assertThat(e.getMessage(), containsString("Validation failed for test-operation")); - assertThat( - e.getMessage(), - containsString( - "The following specified fields are not of type Row. Their nested fields could not be reached")); - for (String badField : badFields) { - assertThat(e.getMessage(), containsString(badField)); - } - } + "nullable_row.nested_row.nonexistent", "row.nonexistent", "row.nested_float"), + Arrays.asList("nullable_row.nested_row.nonexistent", "row.nonexistent"))); + + for (KV, List> fields : nonExistentFields) { + List allFields = fields.getKey(); + List badFields = fields.getValue(); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); + + assertThat(e.getMessage(), containsString("Validation failed for test-operation")); + assertThat( + e.getMessage(), + containsString("Row Schema does not contain the following specified fields")); + for (String badField : badFields) { + assertThat(e.getMessage(), containsString(badField)); + } } + } - @Test - public void testGetFieldTree() { - List fields = + @Test + public void testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() { + List, List>> nonNestedFields = + Arrays.asList( + KV.of( Arrays.asList( - "top-level", - "top-level-2", - "top-level.nested-level", - "top-level.nested-level-2", - "top-level.nested-level.doubly-nested-level", - "top-level.nested-level.doubly-nested-level-2"); - List nestedLayer = + "row.nested_row", "row.nested_int", "row.nested_str.unexpected_nested"), + Collections.singletonList("row.nested_str")), + KV.of( Arrays.asList( - "nested-level", - "nested-level-2", - "nested-level.doubly-nested-level", - "nested-level.doubly-nested-level-2"); - - Map> expectedTree = - ImmutableMap.>builder() - .put("top-level-2", Collections.emptyList()) - .put("top-level", nestedLayer) - .build(); - - assertEquals(expectedTree, RowFilter.getFieldTree(fields)); - - List doublyNestedLayer = Arrays.asList("doubly-nested-level", "doubly-nested-level-2"); - - Map> expectedNestedTree = - ImmutableMap.>builder() - .put("nested-level-2", Collections.emptyList()) - .put("nested-level", doublyNestedLayer) - .build(); - - assertEquals(expectedNestedTree, RowFilter.getFieldTree(nestedLayer)); + "nullable_row.nested_str", + "nullable_row.nested_str.unexpected", + "row.nested_int.unexpected_2"), + Arrays.asList("nullable_row.nested_str", "row.nested_int"))); + + for (KV, List> fields : nonNestedFields) { + List allFields = fields.getKey(); + List badFields = fields.getValue(); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); + + assertThat(e.getMessage(), containsString("Validation failed for test-operation")); + assertThat( + e.getMessage(), + containsString( + "The following specified fields are not of type Row. Their nested fields could not be reached")); + for (String badField : badFields) { + assertThat(e.getMessage(), containsString(badField)); + } } - - @Test - public void testDropSchemaFields() { - List fieldsToDrop = - Arrays.asList( - "str", - "arr_int", - "nullable_int", - "row.nested_int", - "row.nested_float", - "row.nested_row.doubly_nested_int", - "nullable_row.nested_str", - "nullable_row.nested_row"); - - Schema expectedDroppedSchema = + } + + @Test + public void testGetFieldTree() { + List fields = + Arrays.asList( + "top-level", + "top-level-2", + "top-level.nested-level", + "top-level.nested-level-2", + "top-level.nested-level.doubly-nested-level", + "top-level.nested-level.doubly-nested-level-2"); + List nestedLayer = + Arrays.asList( + "nested-level", + "nested-level-2", + "nested-level.doubly-nested-level", + "nested-level.doubly-nested-level-2"); + + Map> expectedTree = + ImmutableMap.>builder() + .put("top-level-2", Collections.emptyList()) + .put("top-level", nestedLayer) + .build(); + + assertEquals(expectedTree, RowFilter.getFieldTree(fields)); + + List doublyNestedLayer = Arrays.asList("doubly-nested-level", "doubly-nested-level-2"); + + Map> expectedNestedTree = + ImmutableMap.>builder() + .put("nested-level-2", Collections.emptyList()) + .put("nested-level", doublyNestedLayer) + .build(); + + assertEquals(expectedNestedTree, RowFilter.getFieldTree(nestedLayer)); + } + + @Test + public void testDropSchemaFields() { + List fieldsToDrop = + Arrays.asList( + "str", + "arr_int", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row.nested_str", + "nullable_row.nested_row"); + + Schema expectedDroppedSchema = + Schema.builder() + .addBooleanField("bool") + .addRowField( + "row", Schema.builder() - .addBooleanField("bool") - .addRowField( - "row", - Schema.builder() - .addStringField("nested_str") - .addRowField( - "nested_row", Schema.builder().addStringField("doubly_nested_str").build()) - .build()) - .addNullableRowField( - "nullable_row", - Schema.builder().addInt32Field("nested_int").addFloatField("nested_float").build()) - .build(); - - assertTrue(expectedDroppedSchema.equivalent(RowFilter.dropFields(ROW_SCHEMA, fieldsToDrop))); - } - - @Test - public void testKeepSchemaFields() { - List fieldsToKeep = - Arrays.asList( - "str", - "arr_int", - "nullable_int", - "row.nested_int", - "row.nested_float", - "row.nested_row.doubly_nested_int", - "nullable_row.nested_str", - "nullable_row.nested_row"); - - Schema expectedKeptSchema = + .addStringField("nested_str") + .addRowField( + "nested_row", Schema.builder().addStringField("doubly_nested_str").build()) + .build()) + .addNullableRowField( + "nullable_row", + Schema.builder().addInt32Field("nested_int").addFloatField("nested_float").build()) + .build(); + + assertTrue(expectedDroppedSchema.equivalent(RowFilter.dropFields(ROW_SCHEMA, fieldsToDrop))); + } + + @Test + public void testKeepSchemaFields() { + List fieldsToKeep = + Arrays.asList( + "str", + "arr_int", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row.nested_str", + "nullable_row.nested_row"); + + Schema expectedKeptSchema = + Schema.builder() + .addStringField("str") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addNullableInt32Field("nullable_int") + .addRowField( + "row", + Schema.builder() + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addRowField( + "nested_row", Schema.builder().addInt32Field("doubly_nested_int").build()) + .build()) + .addNullableRowField( + "nullable_row", Schema.builder() - .addStringField("str") - .addArrayField("arr_int", Schema.FieldType.INT32) - .addNullableInt32Field("nullable_int") - .addRowField( - "row", - Schema.builder() - .addInt32Field("nested_int") - .addFloatField("nested_float") - .addRowField( - "nested_row", Schema.builder().addInt32Field("doubly_nested_int").build()) - .build()) - .addNullableRowField( - "nullable_row", - Schema.builder() - .addStringField("nested_str") - .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) - .build()) - .build(); - - assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep))); - } - - private static final Row ORIGINAL_ROW = - Row.withSchema(ROW_SCHEMA) - .addValue("str_value") - .addValue(true) - .addValue(123) - .addValue(Arrays.asList(1, 2, 3, 4, 5)) - .addValue( - Row.withSchema(NESTED_ROW_SCHEMA) - .addValue("nested_str_value") - .addValue(456) - .addValue(1.234f) - .addValue( - Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) - .addValue("doubly_nested_str_value") - .addValue(789) - .build()) - .build()) - .addValue(null) - .build(); - - private static final Schema FILTERED_DOUBLY_NESTED_SCHEMA = - Schema.builder().addStringField("doubly_nested_str").build(); - private static final Schema FILTERED_NESTED_SCHEMA = - Schema.builder() .addStringField("nested_str") - .addRowField("nested_row", FILTERED_DOUBLY_NESTED_SCHEMA) - .build(); - private static final Schema FILTERED_SCHEMA = - Schema.builder() - .addStringField("str") - .addArrayField("arr_int", Schema.FieldType.INT32) - .addRowField("row", FILTERED_NESTED_SCHEMA) - .build(); - - private static final Row FILTERED_ROW = - Row.withSchema(FILTERED_SCHEMA) - .addValue("str_value") - .addValue(Arrays.asList(1, 2, 3, 4, 5)) - .addValue( - Row.withSchema(FILTERED_NESTED_SCHEMA) - .addValue("nested_str_value") - .addValue( - Row.withSchema(FILTERED_DOUBLY_NESTED_SCHEMA) - .addValue("doubly_nested_str_value") - .build()) - .build()) - .build(); - - @Test - public void testCopyRowWithNewSchema() { - assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, FILTERED_SCHEMA)); - } - - @Test - public void testDropRowFields() { - RowFilter rowFilter = - new RowFilter(ROW_SCHEMA) - .dropping( - Arrays.asList( - "bool", - "nullable_int", - "row.nested_int", - "row.nested_float", - "row.nested_row.doubly_nested_int", - "nullable_row")); - - assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); - } - - @Test - public void testKeepRowFields() { - RowFilter rowFilter = - new RowFilter(ROW_SCHEMA) - .keeping( - Arrays.asList( - "str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str")); + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build()) + .build(); + + assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep))); + } + + private static final Row ORIGINAL_ROW = + Row.withSchema(ROW_SCHEMA) + .addValue("str_value") + .addValue(true) + .addValue(123) + .addValue(Arrays.asList(1, 2, 3, 4, 5)) + .addValue( + Row.withSchema(NESTED_ROW_SCHEMA) + .addValue("nested_str_value") + .addValue(456) + .addValue(1.234f) + .addValue( + Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) + .addValue("doubly_nested_str_value") + .addValue(789) + .build()) + .build()) + .addValue(null) + .build(); + + private static final Schema FILTERED_DOUBLY_NESTED_SCHEMA = + Schema.builder().addStringField("doubly_nested_str").build(); + private static final Schema FILTERED_NESTED_SCHEMA = + Schema.builder() + .addStringField("nested_str") + .addRowField("nested_row", FILTERED_DOUBLY_NESTED_SCHEMA) + .build(); + private static final Schema FILTERED_SCHEMA = + Schema.builder() + .addStringField("str") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addRowField("row", FILTERED_NESTED_SCHEMA) + .build(); + + private static final Row FILTERED_ROW = + Row.withSchema(FILTERED_SCHEMA) + .addValue("str_value") + .addValue(Arrays.asList(1, 2, 3, 4, 5)) + .addValue( + Row.withSchema(FILTERED_NESTED_SCHEMA) + .addValue("nested_str_value") + .addValue( + Row.withSchema(FILTERED_DOUBLY_NESTED_SCHEMA) + .addValue("doubly_nested_str_value") + .build()) + .build()) + .build(); + + @Test + public void testCopyRowWithNewSchema() { + assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, FILTERED_SCHEMA)); + } + + @Test + public void testDropRowFields() { + RowFilter rowFilter = + new RowFilter(ROW_SCHEMA) + .dropping( + Arrays.asList( + "bool", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row")); + + assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); + } + + @Test + public void testKeepRowFields() { + RowFilter rowFilter = + new RowFilter(ROW_SCHEMA) + .keeping( + Arrays.asList( + "str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str")); - assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); - } -} \ No newline at end of file + assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); + } +} From eabb80557f6b40c99fa8c595110e030ae0ecbd8e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 9 Sep 2024 13:14:01 -0400 Subject: [PATCH 3/8] re-order to make public API more visible --- .../org/apache/beam/sdk/util/RowFilter.java | 150 +++++++++--------- 1 file changed, 76 insertions(+), 74 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index d53c68d87c8b..ac328d8d1182 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -72,55 +72,6 @@ public RowFilter(Schema rowSchema) { this.rowSchema = rowSchema; } - /** - * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed - * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a - * nested field could not be reached. - */ - @VisibleForTesting - static void validateSchemaContainsFields( - Schema schema, List specifiedFields, String operation) { - Set notFound = new HashSet<>(); - Set notRowField = new HashSet<>(); - - for (String field : specifiedFields) { - List levels = Splitter.on(".").splitToList(field); - - Schema currentSchema = schema; - - for (int i = 0; i < levels.size(); i++) { - String currentFieldName = String.join(".", levels.subList(0, i + 1)); - - if (!currentSchema.hasField(levels.get(i))) { - notFound.add(currentFieldName); - break; - } - - if (i + 1 < levels.size()) { - Schema.Field nextField = currentSchema.getField(levels.get(i)); - if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) { - notRowField.add(currentFieldName); - break; - } - currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema()); - } - } - } - - if (!notFound.isEmpty() || !notRowField.isEmpty()) { - String message = "Validation failed for " + operation + "."; - if (!notFound.isEmpty()) { - message += "\nRow Schema does not contain the following specified fields: " + notFound; - } - if (!notRowField.isEmpty()) { - message += - "\nThe following specified fields are not of type Row. Their nested fields could not be reached: " - + notRowField; - } - throw new IllegalArgumentException(message); - } - } - /** * Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields. * Nested fields can be specified using dot-notation. @@ -187,6 +138,82 @@ public RowFilter dropping(List fields) { return this; } + /** + * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already + * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this + * {@link RowFilter}. + * + *

If not yet configured, will simply return the same {@link Row}. + */ + public Row filter(Row row) { + if (transformedSchema == null) { + return row; + } + Preconditions.checkState( + row.getSchema().assignableTo(rowSchema), + "Encountered Row with schema that is incompatible with this RowFilter's schema." + + "\nRow schema: %s" + + "\nSchema used to initialize this RowFilter: %s", + row.getSchema(), + rowSchema); + + return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); + } + + /** Returns the output {@link Row}'s {@link Schema}. */ + public Schema outputSchema() { + return transformedSchema != null ? transformedSchema : rowSchema; + } + + /** + * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed + * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a + * nested field could not be reached. + */ + @VisibleForTesting + static void validateSchemaContainsFields( + Schema schema, List specifiedFields, String operation) { + Set notFound = new HashSet<>(); + Set notRowField = new HashSet<>(); + + for (String field : specifiedFields) { + List levels = Splitter.on(".").splitToList(field); + + Schema currentSchema = schema; + + for (int i = 0; i < levels.size(); i++) { + String currentFieldName = String.join(".", levels.subList(0, i + 1)); + + if (!currentSchema.hasField(levels.get(i))) { + notFound.add(currentFieldName); + break; + } + + if (i + 1 < levels.size()) { + Schema.Field nextField = currentSchema.getField(levels.get(i)); + if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) { + notRowField.add(currentFieldName); + break; + } + currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema()); + } + } + } + + if (!notFound.isEmpty() || !notRowField.isEmpty()) { + String message = "Validation failed for " + operation + "."; + if (!notFound.isEmpty()) { + message += "\nRow Schema does not contain the following specified fields: " + notFound; + } + if (!notRowField.isEmpty()) { + message += + "\nThe following specified fields are not of type Row. Their nested fields could not be reached: " + + notRowField; + } + throw new IllegalArgumentException(message); + } + } + /** * Creates a field tree, separating each top-level field from its (potential) nested fields. E.g. * ["foo.bar.baz", "foo.abc", "xyz"] --> {"foo": ["bar.baz", "abc"], "xyz": []} @@ -319,29 +346,4 @@ static Schema keepFields(Schema schema, List fieldsToKeep) { return new Schema(newFieldsList); } - - /** - * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already - * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this - * {@link RowFilter}. - * - *

If not yet configured, will simply return the same {@link Row}. - */ - public Row filter(Row row) { - if (transformedSchema == null) { - return row; - } - Preconditions.checkState( - row.getSchema().assignableTo(rowSchema), - "Encountered Row with schema that is incompatible with this RowFilter's schema.\nRow schema: %s\nSchema used to initialize this RowFilter: %s", - row.getSchema(), - rowSchema); - - return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); - } - - /** Returns the output {@link Row}'s {@link Schema}. */ - public Schema outputSchema() { - return transformedSchema != null ? transformedSchema : rowSchema; - } } From 255872552063c47d39e38e072961e7a6ddb023b6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 9 Sep 2024 18:41:14 -0400 Subject: [PATCH 4/8] add unnest filter operation and tests --- .../org/apache/beam/sdk/util/RowFilter.java | 122 ++++++++++++++++-- .../apache/beam/sdk/util/RowFilterTest.java | 77 +++++++++++ 2 files changed, 191 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index ac328d8d1182..736d54b4e9bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -96,10 +98,7 @@ public RowFilter(Schema rowSchema) { * } */ public RowFilter keeping(List fields) { - Preconditions.checkState( - transformedSchema == null, - "This RowFilter has already been configured to filter to the following Schema: %s", - transformedSchema); + checkUnconfigured(); validateSchemaContainsFields(rowSchema, fields, "\"keep\""); transformedSchema = keepFields(rowSchema, fields); return this; @@ -129,15 +128,55 @@ public RowFilter keeping(List fields) { * } */ public RowFilter dropping(List fields) { - Preconditions.checkState( - transformedSchema == null, - "This RowFilter has already been configured to filter to the following Schema: %s", - transformedSchema); + checkUnconfigured(); validateSchemaContainsFields(rowSchema, fields, "\"drop\""); transformedSchema = dropFields(rowSchema, fields); return this; } + /** + * Configures this {@link RowFilter} to unnest the specified fields to the top-level and keeping + * their leaf names. The unnested fields are kept and everything else is dropped. This will fail + * if two fields have identical leaf names. Nested fields can be specified using dot-notation. + * + *

For example, if we want to unnest the list of fields {@code ["abc", "foo.bar", + * "foo.xyz.baz"]}, for this input {@link Row}: + * + *

{@code
+   * abc: 123
+   * foo:
+   *   bar: my_str
+   *   xyz:
+   *     baz: 456
+   *     qwe: 789
+   * }
+ * + * we will get the following output {@link Row}: + * + *
{@code
+   * abc: 123
+   * bar: my_str
+   * baz: 456
+   * }
+ * + * Note that fields should not have duplicate leaf names. For example, the {@link RowFilter} will + * fail when configuring to unnest the list of fields {@code ["abc.bar", "foo.baz.bar"]} because + * there is a duplicate leaf name "bar". + */ + public RowFilter unnesting(List fields) { + checkUnconfigured(); + validateSchemaContainsFields(rowSchema, fields, "\"unnest\""); + transformedSchema = unnestFields(rowSchema, fields); + List> fieldPaths = new ArrayList<>(fields.size()); + for (String fieldPath : fields) { + fieldPaths.add(Splitter.on(".").splitToList(fieldPath)); + } + fieldPathsToUnnest = fieldPaths; + return this; + } + + private @Nullable List> fieldPathsToUnnest; + /** * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this @@ -149,6 +188,11 @@ public Row filter(Row row) { if (transformedSchema == null) { return row; } + // unnesting case + if (fieldPathsToUnnest != null) { + return unnestRowValues(row); + } + Preconditions.checkState( row.getSchema().assignableTo(rowSchema), "Encountered Row with schema that is incompatible with this RowFilter's schema." @@ -165,6 +209,13 @@ public Schema outputSchema() { return transformedSchema != null ? transformedSchema : rowSchema; } + private void checkUnconfigured() { + Preconditions.checkState( + transformedSchema == null, + "This RowFilter has already been configured to filter to the following Schema: %s", + transformedSchema); + } + /** * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a @@ -261,6 +312,21 @@ static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) { return Row.withSchema(newSchema).withFieldValues(values).build(); } + /** Unnests the specified field values in this Row and outputs a new flattened Row. */ + private Row unnestRowValues(Row row) { + Row.Builder builder = Row.withSchema(checkStateNotNull(transformedSchema)); + for (List fieldPath : checkStateNotNull(fieldPathsToUnnest)) { + Row traversingRow = row; + int i = 0; + while (i < fieldPath.size() - 1) { + traversingRow = checkStateNotNull(traversingRow.getRow(fieldPath.get(i++))); + } + + builder.addValue(traversingRow.getValue(fieldPath.get(i))); + } + return builder.build(); + } + /** * Returns a new {@link Schema} with the specified fields removed. * @@ -346,4 +412,44 @@ static Schema keepFields(Schema schema, List fieldsToKeep) { return new Schema(newFieldsList); } + + @VisibleForTesting + static Schema.Field getNestedField(Schema schema, List fieldPath) { + Preconditions.checkState( + !fieldPath.isEmpty(), "Unexpected call to get nested field without providing a name."); + Schema.Field field = schema.getField(fieldPath.get(0)); + + if (fieldPath.size() == 1) { + return field; + } + Preconditions.checkState( + field.getType().getTypeName().equals(Schema.TypeName.ROW), + "Expected type %s for specified nested field '%s', but instead got type %s.", + Schema.TypeName.ROW, + field, + field.getType().getTypeName()); + + return getNestedField( + checkStateNotNull(field.getType().getRowSchema()), fieldPath.subList(1, fieldPath.size())); + } + + /** + * Returns a new {@link Schema} where the specified fields are unnested and placed at the top + * level. + * + *

No guarantee that field ordering will remain the same. + */ + @VisibleForTesting + static Schema unnestFields(Schema schema, List fieldsToUnnest) { + if (fieldsToUnnest.isEmpty()) { + return schema; + } + List newFieldsList = new ArrayList<>(fieldsToUnnest.size()); + for (String fieldPath : fieldsToUnnest) { + Schema.Field field = getNestedField(schema, Splitter.on(".").splitToList(fieldPath)); + newFieldsList.add(field); + } + + return new Schema(newFieldsList); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java index d1779f3d8465..844a93bb7416 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -30,6 +31,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Rule; import org.junit.Test; @@ -251,6 +253,56 @@ public void testKeepSchemaFields() { assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep))); } + @Test + public void testGetNestedField() { + List fieldPaths = + Arrays.asList( + "str", "row.nested_int", "row.nested_row.doubly_nested_int", "row.nested_row"); + List expectedFields = + Arrays.asList( + Schema.Field.of("str", Schema.FieldType.STRING), + Schema.Field.of("nested_int", Schema.FieldType.INT32), + Schema.Field.of("doubly_nested_int", Schema.FieldType.INT32), + Schema.Field.of("nested_row", Schema.FieldType.row(DOUBLY_NESTED_ROW_SCHEMA))); + + List actualFields = new ArrayList<>(fieldPaths.size()); + for (String fieldPath : fieldPaths) { + actualFields.add( + RowFilter.getNestedField(ROW_SCHEMA, Splitter.on(".").splitToList(fieldPath))); + } + + assertEquals(expectedFields, actualFields); + } + + @Test + public void testUnnestSchemaFields() { + List fieldsToUnnest = + Arrays.asList( + "str", + "arr_int", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row.nested_str", + "nullable_row.nested_row"); + + Schema expectedUnnestedSchema = + Schema.builder() + .addStringField("str") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addNullableInt32Field("nullable_int") + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addInt32Field("doubly_nested_int") + .addStringField("nested_str") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build(); + + assertTrue( + expectedUnnestedSchema.equivalent(RowFilter.unnestFields(ROW_SCHEMA, fieldsToUnnest))); + } + private static final Row ORIGINAL_ROW = Row.withSchema(ROW_SCHEMA) .addValue("str_value") @@ -330,4 +382,29 @@ public void testKeepRowFields() { assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); } + + @Test + public void testUnnestRowFields() { + List unnestFields = + Arrays.asList( + "str", + "arr_int", + "row.nested_str", + "row.nested_row.doubly_nested_str", + "row.nested_row"); + RowFilter rowFilter = new RowFilter(ROW_SCHEMA).unnesting(unnestFields); + + Schema unnestedSchema = RowFilter.unnestFields(ROW_SCHEMA, unnestFields); + Row expecedRow = + Row.withSchema(unnestedSchema) + .addValues( + ORIGINAL_ROW.getString("str"), + ORIGINAL_ROW.getArray("arr_int"), + ORIGINAL_ROW.getRow("row").getString("nested_str"), + ORIGINAL_ROW.getRow("row").getRow("nested_row").getString("doubly_nested_str"), + ORIGINAL_ROW.getRow("row").getRow("nested_row")) + .build(); + + assertEquals(expecedRow, rowFilter.filter(ORIGINAL_ROW)); + } } From 1fedf835186b66363eb7d0242366ee0b8760633d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 16 Sep 2024 07:10:44 -0400 Subject: [PATCH 5/8] fail when nested fields are specified for 'keep' or 'drop' --- .../org/apache/beam/sdk/util/RowFilter.java | 48 ++++++++++++++----- .../apache/beam/sdk/util/RowFilterTest.java | 10 +++- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index 736d54b4e9bb..78a7c144ba3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -36,12 +36,14 @@ /** * A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate - * what fields you would like to either keep or drop. Afterward, - * call {@link #filter(Row)} on a Schema-compatible Row to filter it. An un-configured filter will - * simply return the input row untouched. + * what fields you would like to either keep, drop, or + * unnest. Afterward, call {@link #filter(Row)} on a Schema-compatible Row to + * filter it. An un-configured filter will simply return the input row untouched. * *

Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). * + *

Note: Nested fields are only supported for unnest. + * *

A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link * Schema}. You can access this new Schema ahead of time via the filter's {@link #outputSchema()}. * @@ -51,20 +53,23 @@ * // this is an un-configured filter * RowFilter unconfigured = new RowFilter(beamSchema); * - * List fields = Arrays.asList("foo", "bar.xyz", "baz.abc.qwe"); - * * // this filter will exclusively keep these fields and drop everything else + * List fields = Arrays.asList("foo", "bar", "baz"); * RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields); * * // this filter will drop these fields * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields); * + * // this filter will unnest fields "abc" and "xyz" to the top level + * List fields = Arrays.asList("foo.abc", "bar.baz.xyz"); + * RowFilter unnestingFilter = new RowFilter(beamSchema).unnesting(fields); + * * // produces a filtered row * Row outputRow = keepingFilter.filter(row); * } * - * Check the documentation for {@link #keeping(List)} and {@link #dropping(List)} for further - * details on what a filtered Row can look like. + * Check the documentation for {@link #keeping(List)}, {@link #dropping(List)}, and {@link + * #unnesting(List)} for further details on what an output Row can look like. */ public class RowFilter implements Serializable { private final Schema rowSchema; @@ -78,8 +83,8 @@ public RowFilter(Schema rowSchema) { * Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields. * Nested fields can be specified using dot-notation. * - *

For example, if we want to keep the list of fields {@code ["foo", "baz.nested_1"]}, for the - * input {@link Row}: + *

For example, if we want to keep the list of fields {@code ["foo", "baz"]}, for the input + * {@link Row}: * *

{@code
    * foo: 123
@@ -95,10 +100,12 @@ public RowFilter(Schema rowSchema) {
    * foo: 123
    * baz
    *   nested_1: abc
+   *   nested_2: xyz
    * }
*/ public RowFilter keeping(List fields) { checkUnconfigured(); + verifyNoNestedFields(fields, "keep"); validateSchemaContainsFields(rowSchema, fields, "\"keep\""); transformedSchema = keepFields(rowSchema, fields); return this; @@ -108,8 +115,8 @@ public RowFilter keeping(List fields) { * Configures this {@link RowFilter} to filter {@link Row} by removing the specified fields. * Nested fields can be specified using dot-notation. * - *

For example, if we want to drop the list of fields {@code ["foo", "baz.nested_1"]}, for this - * input {@link Row}: + *

For example, if we want to drop the list of fields {@code ["foo", "baz"]}, for this input + * {@link Row}: * *

{@code
    * foo: 123
@@ -123,12 +130,11 @@ public RowFilter keeping(List fields) {
    *
    * 
{@code
    * bar: 456
-   * baz:
-   *   nested_2: xyz
    * }
*/ public RowFilter dropping(List fields) { checkUnconfigured(); + verifyNoNestedFields(fields, "drop"); validateSchemaContainsFields(rowSchema, fields, "\"drop\""); transformedSchema = dropFields(rowSchema, fields); return this; @@ -216,6 +222,22 @@ private void checkUnconfigured() { transformedSchema); } + /** Verifies that this selection contains no nested fields. */ + private void verifyNoNestedFields(List fields, String operation) { + List nestedFields = new ArrayList<>(); + for (String field : fields) { + if (field.contains(".")) { + nestedFields.add(field); + } + } + if (!nestedFields.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "RowFilter does not support specifying nested fields to %s: %s", + operation, nestedFields)); + } + } + /** * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java index 844a93bb7416..e7e6c4e761d2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -357,7 +357,10 @@ public void testCopyRowWithNewSchema() { } @Test - public void testDropRowFields() { + public void testDropNestedFieldsFails() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("RowFilter does not support specifying nested fields to drop"); + RowFilter rowFilter = new RowFilter(ROW_SCHEMA) .dropping( @@ -373,7 +376,10 @@ public void testDropRowFields() { } @Test - public void testKeepRowFields() { + public void testKeepNestedFieldsFails() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("RowFilter does not support specifying nested fields to keep"); + RowFilter rowFilter = new RowFilter(ROW_SCHEMA) .keeping( From 50540d34f7d939dc8e778901e4fea1cdb9aefb76 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 16 Sep 2024 07:16:40 -0400 Subject: [PATCH 6/8] doc nit --- .../src/main/java/org/apache/beam/sdk/util/RowFilter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index 78a7c144ba3f..cae553893e7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -42,10 +42,11 @@ * *

Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). * - *

Note: Nested fields are only supported for unnest. + *

Note: You can only specify nested fields when the filter is configured to + * unnest. * *

A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link - * Schema}. You can access this new Schema ahead of time via the filter's {@link #outputSchema()}. + * Schema}. You can access this new Schema via the filter's {@link #outputSchema()}. * *

Configure a {@link RowFilter} as follows: * From df74050fce4c4a295ed31e10abd6ad6c151ebe07 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Sep 2024 17:25:33 -0400 Subject: [PATCH 7/8] update implementation to unnest all fields under a row --- .../org/apache/beam/sdk/util/RowFilter.java | 91 ++++++----- .../apache/beam/sdk/util/RowFilterTest.java | 141 ++++++++---------- 2 files changed, 117 insertions(+), 115 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index cae553893e7f..59bcd4929e0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -42,9 +42,6 @@ * *

Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). * - *

Note: You can only specify nested fields when the filter is configured to - * unnest. - * *

A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link * Schema}. You can access this new Schema via the filter's {@link #outputSchema()}. * @@ -61,8 +58,8 @@ * // this filter will drop these fields * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields); * - * // this filter will unnest fields "abc" and "xyz" to the top level - * List fields = Arrays.asList("foo.abc", "bar.baz.xyz"); + * // this filter will unnest all fields under "row_a" and "row_b" to the top level + * List fields = Arrays.asList("row_a", "row_b"); * RowFilter unnestingFilter = new RowFilter(beamSchema).unnesting(fields); * * // produces a filtered row @@ -107,7 +104,7 @@ public RowFilter(Schema rowSchema) { public RowFilter keeping(List fields) { checkUnconfigured(); verifyNoNestedFields(fields, "keep"); - validateSchemaContainsFields(rowSchema, fields, "\"keep\""); + validateSchemaContainsFields(rowSchema, fields, "keep"); transformedSchema = keepFields(rowSchema, fields); return this; } @@ -136,23 +133,23 @@ public RowFilter keeping(List fields) { public RowFilter dropping(List fields) { checkUnconfigured(); verifyNoNestedFields(fields, "drop"); - validateSchemaContainsFields(rowSchema, fields, "\"drop\""); + validateSchemaContainsFields(rowSchema, fields, "drop"); transformedSchema = dropFields(rowSchema, fields); return this; } /** - * Configures this {@link RowFilter} to unnest the specified fields to the top-level and keeping - * their leaf names. The unnested fields are kept and everything else is dropped. This will fail - * if two fields have identical leaf names. Nested fields can be specified using dot-notation. + * Configures this {@link RowFilter} to unnest everything under the specified row fields to the + * top-level. The unnested sub-fields are kept with their original names. * - *

For example, if we want to unnest the list of fields {@code ["abc", "foo.bar", - * "foo.xyz.baz"]}, for this input {@link Row}: + *

For example, if we want to unnest the sub-fields of {@code ["abc", "foo"]}, for this input + * {@link Row}: * *

{@code
-   * abc: 123
-   * foo:
+   * wkl: 123
+   * abc:
    *   bar: my_str
+   * foo:
    *   xyz:
    *     baz: 456
    *     qwe: 789
@@ -161,19 +158,34 @@ public RowFilter dropping(List fields) {
    * we will get the following output {@link Row}:
    *
    * 
{@code
-   * abc: 123
    * bar: my_str
-   * baz: 456
+   * xyz:
+   *   baz: 456
+   *   qwe: 789
+   * }
+ * + *

Note that this will fail if two specified row fields contain sub-fields with identical + * names, as that would result in duplicates. This will also fail if a specified field is not of + * type {@link Row}. + * + *

For example, the {@link RowFilter} will fail when configuring to unnest the fields {@code + * ["abc", "foo"]} if they look like this: + * + *

{@code
+   * abc:
+   *   bar: my_str
+   * foo:
+   *   baz: 456
+   *   bar: another_str
    * }
* - * Note that fields should not have duplicate leaf names. For example, the {@link RowFilter} will - * fail when configuring to unnest the list of fields {@code ["abc.bar", "foo.baz.bar"]} because - * there is a duplicate leaf name "bar". + * because this will lead to two fields on the same level named "bar". */ public RowFilter unnesting(List fields) { checkUnconfigured(); - validateSchemaContainsFields(rowSchema, fields, "\"unnest\""); - transformedSchema = unnestFields(rowSchema, fields); + verifyNoNestedFields(fields, "unnest"); + validateSchemaContainsFields(rowSchema, fields, "unnest"); + transformedSchema = unnestRowFields(rowSchema, fields); List> fieldPaths = new ArrayList<>(fields.size()); for (String fieldPath : fields) { fieldPaths.add(Splitter.on(".").splitToList(fieldPath)); @@ -275,7 +287,7 @@ static void validateSchemaContainsFields( } if (!notFound.isEmpty() || !notRowField.isEmpty()) { - String message = "Validation failed for " + operation + "."; + String message = "Validation failed for '" + operation + "'."; if (!notFound.isEmpty()) { message += "\nRow Schema does not contain the following specified fields: " + notFound; } @@ -335,17 +347,17 @@ static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) { return Row.withSchema(newSchema).withFieldValues(values).build(); } - /** Unnests the specified field values in this Row and outputs a new flattened Row. */ + /** Unnests the specified Row field values and outputs a new Row. */ private Row unnestRowValues(Row row) { Row.Builder builder = Row.withSchema(checkStateNotNull(transformedSchema)); for (List fieldPath : checkStateNotNull(fieldPathsToUnnest)) { Row traversingRow = row; int i = 0; - while (i < fieldPath.size() - 1) { + while (i < fieldPath.size()) { traversingRow = checkStateNotNull(traversingRow.getRow(fieldPath.get(i++))); } - builder.addValue(traversingRow.getValue(fieldPath.get(i))); + builder.addValues(traversingRow.getValues()); } return builder.build(); } @@ -437,40 +449,43 @@ static Schema keepFields(Schema schema, List fieldsToKeep) { } @VisibleForTesting - static Schema.Field getNestedField(Schema schema, List fieldPath) { + static Schema.Field getRowFieldToUnnest(Schema schema, List fieldPath) { Preconditions.checkState( !fieldPath.isEmpty(), "Unexpected call to get nested field without providing a name."); Schema.Field field = schema.getField(fieldPath.get(0)); - if (fieldPath.size() == 1) { - return field; - } - Preconditions.checkState( + Preconditions.checkArgument( field.getType().getTypeName().equals(Schema.TypeName.ROW), - "Expected type %s for specified nested field '%s', but instead got type %s.", + "Expected type '%s' for field '%s', but instead got type '%s'.", Schema.TypeName.ROW, - field, + field.getName(), field.getType().getTypeName()); - return getNestedField( + if (fieldPath.size() == 1) { + return field; + } + + return getRowFieldToUnnest( checkStateNotNull(field.getType().getRowSchema()), fieldPath.subList(1, fieldPath.size())); } /** - * Returns a new {@link Schema} where the specified fields are unnested and placed at the top - * level. + * Takes a {@link Schema} and a list of field paths that refer to {@link Row} fields. + * + *

For each row field, its sub-fields are unnested and placed at the top level. * *

No guarantee that field ordering will remain the same. */ @VisibleForTesting - static Schema unnestFields(Schema schema, List fieldsToUnnest) { + static Schema unnestRowFields(Schema schema, List fieldsToUnnest) { if (fieldsToUnnest.isEmpty()) { return schema; } List newFieldsList = new ArrayList<>(fieldsToUnnest.size()); for (String fieldPath : fieldsToUnnest) { - Schema.Field field = getNestedField(schema, Splitter.on(".").splitToList(fieldPath)); - newFieldsList.add(field); + Schema.Field field = getRowFieldToUnnest(schema, Splitter.on(".").splitToList(fieldPath)); + + newFieldsList.addAll(checkStateNotNull(field.getType().getRowSchema()).getFields()); } return new Schema(newFieldsList); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java index e7e6c4e761d2..e6f1cb3d39fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -253,22 +253,67 @@ public void testKeepSchemaFields() { assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep))); } + @Test + public void testDropNestedFieldsFails() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("RowFilter does not support specifying nested fields to drop"); + + new RowFilter(ROW_SCHEMA) + .dropping( + Arrays.asList( + "bool", + "nullable_int", + "row.nested_int", + "row.nested_float", + "row.nested_row.doubly_nested_int", + "nullable_row")); + } + + @Test + public void testKeepNestedFieldsFails() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("RowFilter does not support specifying nested fields to keep"); + + new RowFilter(ROW_SCHEMA) + .keeping( + Arrays.asList("str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str")); + } + + @Test + public void testUnnestingFailsWhenSpecifyingNonRowField() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Expected type 'ROW' for specified field 'nested_int', but instead got type 'INT32'"); + + List invalidFields = Collections.singletonList("row.nested_int"); + + new RowFilter(ROW_SCHEMA).unnesting(invalidFields); + } + + @Test + public void testUnnestFailsOnDuplicateNestedFieldNames() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Duplicate field nested_str"); + + // "row" and "nullable_row" have identical schemas. unnesting their + // fields will result in duplicate field names + List rowFieldsToUnnest = Arrays.asList("row", "nullable_row"); + + RowFilter.unnestRowFields(ROW_SCHEMA, rowFieldsToUnnest); + } + @Test public void testGetNestedField() { - List fieldPaths = - Arrays.asList( - "str", "row.nested_int", "row.nested_row.doubly_nested_int", "row.nested_row"); + List fieldPaths = Arrays.asList("row", "nullable_row.nested_row"); List expectedFields = Arrays.asList( - Schema.Field.of("str", Schema.FieldType.STRING), - Schema.Field.of("nested_int", Schema.FieldType.INT32), - Schema.Field.of("doubly_nested_int", Schema.FieldType.INT32), + Schema.Field.of("row", Schema.FieldType.row(NESTED_ROW_SCHEMA)), Schema.Field.of("nested_row", Schema.FieldType.row(DOUBLY_NESTED_ROW_SCHEMA))); List actualFields = new ArrayList<>(fieldPaths.size()); for (String fieldPath : fieldPaths) { actualFields.add( - RowFilter.getNestedField(ROW_SCHEMA, Splitter.on(".").splitToList(fieldPath))); + RowFilter.getRowFieldToUnnest(ROW_SCHEMA, Splitter.on(".").splitToList(fieldPath))); } assertEquals(expectedFields, actualFields); @@ -276,31 +321,17 @@ public void testGetNestedField() { @Test public void testUnnestSchemaFields() { - List fieldsToUnnest = - Arrays.asList( - "str", - "arr_int", - "nullable_int", - "row.nested_int", - "row.nested_float", - "row.nested_row.doubly_nested_int", - "nullable_row.nested_str", - "nullable_row.nested_row"); + List rowFieldsToUnnest = Arrays.asList("row", "row.nested_row"); - Schema expectedUnnestedSchema = - Schema.builder() - .addStringField("str") - .addArrayField("arr_int", Schema.FieldType.INT32) - .addNullableInt32Field("nullable_int") - .addInt32Field("nested_int") - .addFloatField("nested_float") - .addInt32Field("doubly_nested_int") - .addStringField("nested_str") - .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) - .build(); + List fieldList = new ArrayList<>(); + fieldList.addAll(NESTED_ROW_SCHEMA.getFields()); + fieldList.addAll(DOUBLY_NESTED_ROW_SCHEMA.getFields()); + + Schema expectedUnnestedSchema = new Schema(fieldList); assertTrue( - expectedUnnestedSchema.equivalent(RowFilter.unnestFields(ROW_SCHEMA, fieldsToUnnest))); + expectedUnnestedSchema.equivalent( + RowFilter.unnestRowFields(ROW_SCHEMA, rowFieldsToUnnest))); } private static final Row ORIGINAL_ROW = @@ -356,59 +387,15 @@ public void testCopyRowWithNewSchema() { assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, FILTERED_SCHEMA)); } - @Test - public void testDropNestedFieldsFails() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("RowFilter does not support specifying nested fields to drop"); - - RowFilter rowFilter = - new RowFilter(ROW_SCHEMA) - .dropping( - Arrays.asList( - "bool", - "nullable_int", - "row.nested_int", - "row.nested_float", - "row.nested_row.doubly_nested_int", - "nullable_row")); - - assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); - } - - @Test - public void testKeepNestedFieldsFails() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("RowFilter does not support specifying nested fields to keep"); - - RowFilter rowFilter = - new RowFilter(ROW_SCHEMA) - .keeping( - Arrays.asList( - "str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str")); - - assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW)); - } - @Test public void testUnnestRowFields() { - List unnestFields = - Arrays.asList( - "str", - "arr_int", - "row.nested_str", - "row.nested_row.doubly_nested_str", - "row.nested_row"); + List unnestFields = Arrays.asList("row", "row.nested_row"); RowFilter rowFilter = new RowFilter(ROW_SCHEMA).unnesting(unnestFields); - Schema unnestedSchema = RowFilter.unnestFields(ROW_SCHEMA, unnestFields); Row expecedRow = - Row.withSchema(unnestedSchema) - .addValues( - ORIGINAL_ROW.getString("str"), - ORIGINAL_ROW.getArray("arr_int"), - ORIGINAL_ROW.getRow("row").getString("nested_str"), - ORIGINAL_ROW.getRow("row").getRow("nested_row").getString("doubly_nested_str"), - ORIGINAL_ROW.getRow("row").getRow("nested_row")) + Row.withSchema(rowFilter.outputSchema()) + .addValues(ORIGINAL_ROW.getRow("row").getValues()) + .addValues(ORIGINAL_ROW.getRow("row").getRow("nested_row").getValues()) .build(); assertEquals(expecedRow, rowFilter.filter(ORIGINAL_ROW)); From 50d10b1805c2199d9c3a9523a66829cb66925a2e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 23 Sep 2024 13:54:57 -0400 Subject: [PATCH 8/8] switch implementation from 'unnest' multiple fields to 'only' select one field --- .../org/apache/beam/sdk/util/RowFilter.java | 142 +++++------------- .../apache/beam/sdk/util/RowFilterTest.java | 64 +------- 2 files changed, 43 insertions(+), 163 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index 59bcd4929e0f..4e0d9d3ff30d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,11 +37,10 @@ /** * A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate - * what fields you would like to either keep, drop, or - * unnest. Afterward, call {@link #filter(Row)} on a Schema-compatible Row to - * filter it. An un-configured filter will simply return the input row untouched. - * - *

Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). + * what fields you would like to either keep or drop. You may also + * specify a singular {@link Row} field to extract with only. Afterward, call + * {@link #filter(Row)} on a Schema-compatible Row to filter it. An un-configured filter will simply + * return the input row untouched. * *

A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link * Schema}. You can access this new Schema via the filter's {@link #outputSchema()}. @@ -58,20 +58,22 @@ * // this filter will drop these fields * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields); * - * // this filter will unnest all fields under "row_a" and "row_b" to the top level - * List fields = Arrays.asList("row_a", "row_b"); - * RowFilter unnestingFilter = new RowFilter(beamSchema).unnesting(fields); + * // this filter will only output the contents of row field "my_record" + * String field = "my_record"; + * RowFilter onlyFilter = new RowFilter(beamSchema).only(field); * * // produces a filtered row * Row outputRow = keepingFilter.filter(row); * }

* * Check the documentation for {@link #keeping(List)}, {@link #dropping(List)}, and {@link - * #unnesting(List)} for further details on what an output Row can look like. + * #only(String)} for further details on what an output Row can look like. */ public class RowFilter implements Serializable { private final Schema rowSchema; private @Nullable Schema transformedSchema; + // for 'only' case + private @Nullable String onlyField; public RowFilter(Schema rowSchema) { this.rowSchema = rowSchema; @@ -139,16 +141,14 @@ public RowFilter dropping(List fields) { } /** - * Configures this {@link RowFilter} to unnest everything under the specified row fields to the - * top-level. The unnested sub-fields are kept with their original names. + * Configures this {@link RowFilter} to only output the contents of a single row field. * - *

For example, if we want to unnest the sub-fields of {@code ["abc", "foo"]}, for this input - * {@link Row}: + *

For example, if we want to only extract the contents of field "foo" for this input {@link + * Row}: * *

{@code
-   * wkl: 123
-   * abc:
-   *   bar: my_str
+   * abc: 123
+   * bar: my_str
    * foo:
    *   xyz:
    *     baz: 456
@@ -158,44 +158,30 @@ public RowFilter dropping(List fields) {
    * we will get the following output {@link Row}:
    *
    * 
{@code
-   * bar: my_str
    * xyz:
    *   baz: 456
    *   qwe: 789
    * }
* - *

Note that this will fail if two specified row fields contain sub-fields with identical - * names, as that would result in duplicates. This will also fail if a specified field is not of - * type {@link Row}. - * - *

For example, the {@link RowFilter} will fail when configuring to unnest the fields {@code - * ["abc", "foo"]} if they look like this: - * - *

{@code
-   * abc:
-   *   bar: my_str
-   * foo:
-   *   baz: 456
-   *   bar: another_str
-   * }
- * - * because this will lead to two fields on the same level named "bar". + *

Note that this will fail if the field is not of type {@link Row}, e.g. if {@code "abc"} is + * specified for the example above. */ - public RowFilter unnesting(List fields) { + public RowFilter only(String field) { checkUnconfigured(); - verifyNoNestedFields(fields, "unnest"); - validateSchemaContainsFields(rowSchema, fields, "unnest"); - transformedSchema = unnestRowFields(rowSchema, fields); - List> fieldPaths = new ArrayList<>(fields.size()); - for (String fieldPath : fields) { - fieldPaths.add(Splitter.on(".").splitToList(fieldPath)); - } - fieldPathsToUnnest = fieldPaths; + validateSchemaContainsFields(rowSchema, Collections.singletonList(field), "only"); + Schema.Field rowField = rowSchema.getField(field); + Preconditions.checkArgument( + rowField.getType().getTypeName().equals(Schema.TypeName.ROW), + "Expected type '%s' for field '%s', but instead got type '%s'.", + Schema.TypeName.ROW, + rowField.getName(), + rowField.getType().getTypeName()); + + transformedSchema = rowField.getType().getRowSchema(); + onlyField = field; return this; } - private @Nullable List> fieldPathsToUnnest; - /** * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this @@ -207,10 +193,6 @@ public Row filter(Row row) { if (transformedSchema == null) { return row; } - // unnesting case - if (fieldPathsToUnnest != null) { - return unnestRowValues(row); - } Preconditions.checkState( row.getSchema().assignableTo(rowSchema), @@ -220,6 +202,12 @@ public Row filter(Row row) { row.getSchema(), rowSchema); + // 'only' case + if (onlyField != null) { + return checkStateNotNull(row.getRow(onlyField)); + } + + // 'keep' and 'drop' return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); } @@ -347,21 +335,6 @@ static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) { return Row.withSchema(newSchema).withFieldValues(values).build(); } - /** Unnests the specified Row field values and outputs a new Row. */ - private Row unnestRowValues(Row row) { - Row.Builder builder = Row.withSchema(checkStateNotNull(transformedSchema)); - for (List fieldPath : checkStateNotNull(fieldPathsToUnnest)) { - Row traversingRow = row; - int i = 0; - while (i < fieldPath.size()) { - traversingRow = checkStateNotNull(traversingRow.getRow(fieldPath.get(i++))); - } - - builder.addValues(traversingRow.getValues()); - } - return builder.build(); - } - /** * Returns a new {@link Schema} with the specified fields removed. * @@ -447,47 +420,4 @@ static Schema keepFields(Schema schema, List fieldsToKeep) { return new Schema(newFieldsList); } - - @VisibleForTesting - static Schema.Field getRowFieldToUnnest(Schema schema, List fieldPath) { - Preconditions.checkState( - !fieldPath.isEmpty(), "Unexpected call to get nested field without providing a name."); - Schema.Field field = schema.getField(fieldPath.get(0)); - - Preconditions.checkArgument( - field.getType().getTypeName().equals(Schema.TypeName.ROW), - "Expected type '%s' for field '%s', but instead got type '%s'.", - Schema.TypeName.ROW, - field.getName(), - field.getType().getTypeName()); - - if (fieldPath.size() == 1) { - return field; - } - - return getRowFieldToUnnest( - checkStateNotNull(field.getType().getRowSchema()), fieldPath.subList(1, fieldPath.size())); - } - - /** - * Takes a {@link Schema} and a list of field paths that refer to {@link Row} fields. - * - *

For each row field, its sub-fields are unnested and placed at the top level. - * - *

No guarantee that field ordering will remain the same. - */ - @VisibleForTesting - static Schema unnestRowFields(Schema schema, List fieldsToUnnest) { - if (fieldsToUnnest.isEmpty()) { - return schema; - } - List newFieldsList = new ArrayList<>(fieldsToUnnest.size()); - for (String fieldPath : fieldsToUnnest) { - Schema.Field field = getRowFieldToUnnest(schema, Splitter.on(".").splitToList(fieldPath)); - - newFieldsList.addAll(checkStateNotNull(field.getType().getRowSchema()).getFields()); - } - - return new Schema(newFieldsList); - } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java index e6f1cb3d39fe..22c17f6d07c9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -31,7 +30,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Rule; import org.junit.Test; @@ -103,7 +101,7 @@ public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() { () -> RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); - assertThat(e.getMessage(), containsString("Validation failed for test-operation")); + assertThat(e.getMessage(), containsString("Validation failed for 'test-operation'")); assertThat( e.getMessage(), containsString("Row Schema does not contain the following specified fields")); @@ -138,7 +136,7 @@ public void testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() { () -> RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation")); - assertThat(e.getMessage(), containsString("Validation failed for test-operation")); + assertThat(e.getMessage(), containsString("Validation failed for 'test-operation'")); assertThat( e.getMessage(), containsString( @@ -280,58 +278,12 @@ public void testKeepNestedFieldsFails() { } @Test - public void testUnnestingFailsWhenSpecifyingNonRowField() { + public void testOnlyFailsWhenSpecifyingNonRowField() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( - "Expected type 'ROW' for specified field 'nested_int', but instead got type 'INT32'"); + "Expected type 'ROW' for field 'nullable_int', but instead got type 'INT32'"); - List invalidFields = Collections.singletonList("row.nested_int"); - - new RowFilter(ROW_SCHEMA).unnesting(invalidFields); - } - - @Test - public void testUnnestFailsOnDuplicateNestedFieldNames() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Duplicate field nested_str"); - - // "row" and "nullable_row" have identical schemas. unnesting their - // fields will result in duplicate field names - List rowFieldsToUnnest = Arrays.asList("row", "nullable_row"); - - RowFilter.unnestRowFields(ROW_SCHEMA, rowFieldsToUnnest); - } - - @Test - public void testGetNestedField() { - List fieldPaths = Arrays.asList("row", "nullable_row.nested_row"); - List expectedFields = - Arrays.asList( - Schema.Field.of("row", Schema.FieldType.row(NESTED_ROW_SCHEMA)), - Schema.Field.of("nested_row", Schema.FieldType.row(DOUBLY_NESTED_ROW_SCHEMA))); - - List actualFields = new ArrayList<>(fieldPaths.size()); - for (String fieldPath : fieldPaths) { - actualFields.add( - RowFilter.getRowFieldToUnnest(ROW_SCHEMA, Splitter.on(".").splitToList(fieldPath))); - } - - assertEquals(expectedFields, actualFields); - } - - @Test - public void testUnnestSchemaFields() { - List rowFieldsToUnnest = Arrays.asList("row", "row.nested_row"); - - List fieldList = new ArrayList<>(); - fieldList.addAll(NESTED_ROW_SCHEMA.getFields()); - fieldList.addAll(DOUBLY_NESTED_ROW_SCHEMA.getFields()); - - Schema expectedUnnestedSchema = new Schema(fieldList); - - assertTrue( - expectedUnnestedSchema.equivalent( - RowFilter.unnestRowFields(ROW_SCHEMA, rowFieldsToUnnest))); + new RowFilter(ROW_SCHEMA).only("nullable_int"); } private static final Row ORIGINAL_ROW = @@ -388,14 +340,12 @@ public void testCopyRowWithNewSchema() { } @Test - public void testUnnestRowFields() { - List unnestFields = Arrays.asList("row", "row.nested_row"); - RowFilter rowFilter = new RowFilter(ROW_SCHEMA).unnesting(unnestFields); + public void testOnlyRowField() { + RowFilter rowFilter = new RowFilter(ROW_SCHEMA).only("row"); Row expecedRow = Row.withSchema(rowFilter.outputSchema()) .addValues(ORIGINAL_ROW.getRow("row").getValues()) - .addValues(ORIGINAL_ROW.getRow("row").getRow("nested_row").getValues()) .build(); assertEquals(expecedRow, rowFilter.filter(ORIGINAL_ROW));