Skip to content

Commit

Permalink
fail when nested fields are specified for 'keep' or 'drop'
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Sep 16, 2024
1 parent 2558725 commit 1fedf83
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@

/**
* A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate
* what fields you would like to either <strong>keep</strong> or <strong>drop</strong>. Afterward,
* call {@link #filter(Row)} on a Schema-compatible Row to filter it. An un-configured filter will
* simply return the input row untouched.
* what fields you would like to either <strong>keep</strong>, <strong>drop</strong>, or
* <strong>unnest</strong>. Afterward, call {@link #filter(Row)} on a Schema-compatible Row to
* filter it. An un-configured filter will simply return the input row untouched.
*
* <p>Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}).
*
* <p>Note: Nested fields are only supported for <strong>unnest</strong>.
*
* <p>A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link
* Schema}. You can access this new Schema ahead of time via the filter's {@link #outputSchema()}.
*
Expand All @@ -51,20 +53,23 @@
* // this is an un-configured filter
* RowFilter unconfigured = new RowFilter(beamSchema);
*
* List<String> fields = Arrays.asList("foo", "bar.xyz", "baz.abc.qwe");
*
* // this filter will exclusively keep these fields and drop everything else
* List<String> fields = Arrays.asList("foo", "bar", "baz");
* RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields);
*
* // this filter will drop these fields
* RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields);
*
* // this filter will unnest fields "abc" and "xyz" to the top level
* List<String> fields = Arrays.asList("foo.abc", "bar.baz.xyz");
* RowFilter unnestingFilter = new RowFilter(beamSchema).unnesting(fields);
*
* // produces a filtered row
* Row outputRow = keepingFilter.filter(row);
* }</pre>
*
* Check the documentation for {@link #keeping(List)} and {@link #dropping(List)} for further
* details on what a filtered Row can look like.
* Check the documentation for {@link #keeping(List)}, {@link #dropping(List)}, and {@link
* #unnesting(List)} for further details on what an output Row can look like.
*/
public class RowFilter implements Serializable {
private final Schema rowSchema;
Expand All @@ -78,8 +83,8 @@ public RowFilter(Schema rowSchema) {
* Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields.
* Nested fields can be specified using dot-notation.
*
* <p>For example, if we want to keep the list of fields {@code ["foo", "baz.nested_1"]}, for the
* input {@link Row}:
* <p>For example, if we want to keep the list of fields {@code ["foo", "baz"]}, for the input
* {@link Row}:
*
* <pre>{@code
* foo: 123
Expand All @@ -95,10 +100,12 @@ public RowFilter(Schema rowSchema) {
* foo: 123
* baz
* nested_1: abc
* nested_2: xyz
* }</pre>
*/
public RowFilter keeping(List<String> fields) {
checkUnconfigured();
verifyNoNestedFields(fields, "keep");
validateSchemaContainsFields(rowSchema, fields, "\"keep\"");
transformedSchema = keepFields(rowSchema, fields);
return this;
Expand All @@ -108,8 +115,8 @@ public RowFilter keeping(List<String> fields) {
* Configures this {@link RowFilter} to filter {@link Row} by removing the specified fields.
* Nested fields can be specified using dot-notation.
*
* <p>For example, if we want to drop the list of fields {@code ["foo", "baz.nested_1"]}, for this
* input {@link Row}:
* <p>For example, if we want to drop the list of fields {@code ["foo", "baz"]}, for this input
* {@link Row}:
*
* <pre>{@code
* foo: 123
Expand All @@ -123,12 +130,11 @@ public RowFilter keeping(List<String> fields) {
*
* <pre>{@code
* bar: 456
* baz:
* nested_2: xyz
* }</pre>
*/
public RowFilter dropping(List<String> fields) {
checkUnconfigured();
verifyNoNestedFields(fields, "drop");
validateSchemaContainsFields(rowSchema, fields, "\"drop\"");
transformedSchema = dropFields(rowSchema, fields);
return this;
Expand Down Expand Up @@ -216,6 +222,22 @@ private void checkUnconfigured() {
transformedSchema);
}

/** Verifies that this selection contains no nested fields. */
private void verifyNoNestedFields(List<String> fields, String operation) {
List<String> nestedFields = new ArrayList<>();
for (String field : fields) {
if (field.contains(".")) {
nestedFields.add(field);
}
}
if (!nestedFields.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"RowFilter does not support specifying nested fields to %s: %s",
operation, nestedFields));
}
}

/**
* Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed
* with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ public void testCopyRowWithNewSchema() {
}

@Test
public void testDropRowFields() {
public void testDropNestedFieldsFails() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("RowFilter does not support specifying nested fields to drop");

RowFilter rowFilter =
new RowFilter(ROW_SCHEMA)
.dropping(
Expand All @@ -373,7 +376,10 @@ public void testDropRowFields() {
}

@Test
public void testKeepRowFields() {
public void testKeepNestedFieldsFails() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("RowFilter does not support specifying nested fields to keep");

RowFilter rowFilter =
new RowFilter(ROW_SCHEMA)
.keeping(
Expand Down

0 comments on commit 1fedf83

Please sign in to comment.