From d24f39d9fc4ea7349b76f48bc334da903d59adc3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 29 Aug 2024 15:26:41 -0400 Subject: [PATCH 1/4] RowStringInterpolator --- .../beam/sdk/util/RowStringInterpolator.java | 181 +++++++++++++++++ .../sdk/util/RowStringInterpolatorTest.java | 191 ++++++++++++++++++ 2 files changed, 372 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java new file mode 100644 index 000000000000..9d4fa67a4d63 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * A utility that interpolates values in a pre-determined {@link String} using an input Beam {@link + * Row}. + * + *

The {@link RowStringInterpolator} looks for field names specified inside {curly braces}. For + * example, if the interpolator is configured with the String {@code "unified {foo} and streaming"}, + * it will look for a field name {@code "foo"} in the input {@link Row} and substitute in that + * value. A {@link RowStringInterpolator} configured with a template String that contains no + * placeholders (i.e. no curly braces), it will always return that String, untouched. + * + *

Nested fields can be specified using dot-notation (e.g. {@code "top.middle.nested"}). + * + *

Configure a {@link RowStringInterpolator} like so: + * + *

{@code
+ * String template = "unified {foo} and {bar.baz}!"
+ * Row inputRow = ...
+ *
+ * RowStringInterpolator interpolator = new RowStringInterpolator(template, beamSchema);
+ * String output = interpolator.interpolate(inputRow);
+ * // output --> "unified batch and streaming!"
+ * }
+ * + *

Additionally, {@link #interpolate(Row, BoundedWindow, PaneInfo, Instant)} can be used in + * streaming scenarios when you want to include windowing metadata in the output String. To make use + * of this, include the relevant placeholder: + * + *

+ * + *

For example, your Sting template can look like: + * + *

{@code "unified {foo} and {bar} since {$YYYY}-{$MM}!"}
+ */ +public class RowStringInterpolator implements Serializable { + private final String template; + private final Set fieldsToReplace; + public static final String WINDOW = "$WINDOW"; + public static final String PANE_INDEX = "$PANE_INDEX"; + public static final String YYYY = "$YYYY"; + public static final String MM = "$MM"; + public static final String DD = "$DD"; + private static final Set WINDOWING_METADATA = + Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD); + + public RowStringInterpolator(String template, Schema rowSchema) { + this.template = template; + + Matcher m = Pattern.compile("\\{(.+?)}").matcher(template); + fieldsToReplace = new HashSet<>(); + while (m.find()) { + fieldsToReplace.add(StringUtils.strip(m.group(), "{}")); + } + + List rowFields = + fieldsToReplace.stream() + .filter(f -> !WINDOWING_METADATA.contains(f)) + .collect(Collectors.toList()); + + RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation"); + } + + /** Performs string interpolation on the template using values from the input {@link Row}. */ + public String interpolate(Row row) { + String interpolated = this.template; + for (String field : fieldsToReplace) { + List levels = Splitter.on(".").splitToList(field); + + Object val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); + + interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); + } + return interpolated; + } + + /** Like {@link #interpolate(Row)} but also potentially include windowing information. */ + public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) { + String interpolated = this.template; + for (String field : fieldsToReplace) { + Object val; + switch (field) { + case WINDOW: + val = window.toString(); + break; + case PANE_INDEX: + val = paneInfo.getIndex(); + break; + case YYYY: + val = timestamp.getChronology().year().get(timestamp.getMillis()); + break; + case MM: + val = timestamp.getChronology().monthOfYear().get(timestamp.getMillis()); + break; + case DD: + val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis()); + break; + default: + List levels = Splitter.on(".").splitToList(field); + val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); + break; + } + + interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); + } + return interpolated; + } + + private @Nullable Object getValue(@Nullable Row row, List fieldLevels, int index) { + if (row == null || fieldLevels.isEmpty()) { + return null; + } + Preconditions.checkState( + index < fieldLevels.size(), + "'%s' only goes %s levels deep. Invalid attempt to check for depth at level %s", + String.join(".", fieldLevels), + fieldLevels.size(), + index); + + String currentField = fieldLevels.get(index); + Object val; + try { + val = row.getValue(currentField); + } catch (IllegalArgumentException e) { + throw new RuntimeException( + String.format( + "Invalid row does not contain field '%s'.", + String.join(".", fieldLevels.subList(0, index + 1))), + e); + } + + // base case: we've reached the target + if (index == fieldLevels.size() - 1) { + return val; + } + + return getValue((Row) val, fieldLevels, ++index); + } +} \ No newline at end of file diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java new file mode 100644 index 000000000000..1be111e009de --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** Test class for {@link RowStringInterpolator}. */ +public class RowStringInterpolatorTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final Schema DOUBLY_NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("doubly_nested_str") + .addInt32Field("doubly_nested_int") + .build(); + + private static final Schema NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("nested_str") + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build(); + private static final Schema ROW_SCHEMA = + Schema.builder() + .addStringField("str") + .addBooleanField("bool") + .addInt32Field("int") + .addNullableInt32Field("nullable_int") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addRowField("row", NESTED_ROW_SCHEMA) + .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .build(); + + @Test + public void testInvalidRowThrowsHelpfulError() { + String template = "foo {str}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Invalid row does not contain field 'str'."); + + interpolator.interpolate(invalidRow); + } + + @Test + public void testInvalidRowThrowsHelpfulErrorForNestedFields() { + String template = "foo {row.nested_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Schema nestedSchema = Schema.builder().addNullableStringField("xyz").build(); + Row invalidRow = + Row.withSchema(Schema.builder().addNullableRowField("row", nestedSchema).build()) + .addValue(Row.nullRow(nestedSchema)) + .build(); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Invalid row does not contain field 'row.nested_int'."); + + interpolator.interpolate(invalidRow); + } + + @Test + public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() { + String template = "foo {row.nested_row.doubly_nested_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Schema doublyNestedSchema = Schema.builder().addNullableStringField("xyz").build(); + Schema nestedSchema = + Schema.builder().addNullableRowField("nested_row", doublyNestedSchema).build(); + Row invalidRow = + Row.withSchema(Schema.builder().addNullableRowField("row", doublyNestedSchema).build()) + .addValue( + Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build()) + .build(); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Invalid row does not contain field 'row.nested_row.doubly_nested_int'."); + + interpolator.interpolate(invalidRow); + } + + private static final Row ROW = + Row.withSchema(ROW_SCHEMA) + .addValue("str_value") + .addValue(true) + .addValue(123) + .addValue(null) + .addValue(Arrays.asList(1, 2, 3, 4, 5)) + .addValue( + Row.withSchema(NESTED_ROW_SCHEMA) + .addValue("nested_str_value") + .addValue(456) + .addValue(1.234f) + .addValue( + Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) + .addValue("doubly_nested_str_value") + .addValue(789) + .build()) + .build()) + .addValue(null) + .build(); + + @Test + public void testTopLevelInterpolation() { + String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + String output = interpolator.interpolate(ROW); + + assertEquals("foo str_value, bar true, baz 123, xyz ", output); + } + + @Test + public void testNestedLevelInterpolation() { + String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + String output = interpolator.interpolate(ROW); + + assertEquals("foo str_value, bar nested_str_value, baz 1.234", output); + } + + @Test + public void testDoublyNestedInterpolation() { + String template = + "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + String output = interpolator.interpolate(ROW); + + assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output); + } + + @Test + public void testInterpolateWindowingInformation() { + String template = + String.format( + "str: {str}, window: {%s}, pane: {%s}, year: {%s}, month: {%s}, day: {%s}", + RowStringInterpolator.WINDOW, + RowStringInterpolator.PANE_INDEX, + RowStringInterpolator.YYYY, + RowStringInterpolator.MM, + RowStringInterpolator.DD); + + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Instant instant = new DateTime(2024, 8, 28, 12, 0).toInstant(); + + String output = + interpolator.interpolate( + ROW, + GlobalWindow.INSTANCE, + PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0), + instant); + String expected = + String.format( + "str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28", + GlobalWindow.INSTANCE); + + assertEquals(expected, output); + } +} \ No newline at end of file From b178477d02fdf6c15ef2e4c9ed30df2492457497 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 29 Aug 2024 23:59:40 -0400 Subject: [PATCH 2/4] spotless --- .../beam/sdk/util/RowStringInterpolator.java | 188 +++++------ .../sdk/util/RowStringInterpolatorTest.java | 314 +++++++++--------- 2 files changed, 251 insertions(+), 251 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java index 9d4fa67a4d63..a79ab89f518a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java @@ -43,16 +43,16 @@ *

The {@link RowStringInterpolator} looks for field names specified inside {curly braces}. For * example, if the interpolator is configured with the String {@code "unified {foo} and streaming"}, * it will look for a field name {@code "foo"} in the input {@link Row} and substitute in that - * value. A {@link RowStringInterpolator} configured with a template String that contains no - * placeholders (i.e. no curly braces), it will always return that String, untouched. + * value. If a {@link RowStringInterpolator} is configured with a template String that contains no + * placeholders (i.e. no curly braces), it will simply return that String, untouched. * *

Nested fields can be specified using dot-notation (e.g. {@code "top.middle.nested"}). * *

Configure a {@link RowStringInterpolator} like so: * *

{@code
- * String template = "unified {foo} and {bar.baz}!"
- * Row inputRow = ...
+ * String template = "unified {foo} and {bar.baz}!";
+ * Row inputRow = {foo: "batch", bar: {baz: "streaming"}, ...};
  *
  * RowStringInterpolator interpolator = new RowStringInterpolator(template, beamSchema);
  * String output = interpolator.interpolate(inputRow);
@@ -60,8 +60,8 @@
  * }
* *

Additionally, {@link #interpolate(Row, BoundedWindow, PaneInfo, Instant)} can be used in - * streaming scenarios when you want to include windowing metadata in the output String. To make use - * of this, include the relevant placeholder: + * streaming scenarios to substitute windowing metadata into the template String. To make use of + * this, use the relevant placeholder: * *

    *
  • $WINDOW: the window's string representation @@ -76,106 +76,106 @@ *
    {@code "unified {foo} and {bar} since {$YYYY}-{$MM}!"}
    */ public class RowStringInterpolator implements Serializable { - private final String template; - private final Set fieldsToReplace; - public static final String WINDOW = "$WINDOW"; - public static final String PANE_INDEX = "$PANE_INDEX"; - public static final String YYYY = "$YYYY"; - public static final String MM = "$MM"; - public static final String DD = "$DD"; - private static final Set WINDOWING_METADATA = - Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD); + private final String template; + private final Set fieldsToReplace; + public static final String WINDOW = "$WINDOW"; + public static final String PANE_INDEX = "$PANE_INDEX"; + public static final String YYYY = "$YYYY"; + public static final String MM = "$MM"; + public static final String DD = "$DD"; + private static final Set WINDOWING_METADATA = + Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD); - public RowStringInterpolator(String template, Schema rowSchema) { - this.template = template; + public RowStringInterpolator(String template, Schema rowSchema) { + this.template = template; - Matcher m = Pattern.compile("\\{(.+?)}").matcher(template); - fieldsToReplace = new HashSet<>(); - while (m.find()) { - fieldsToReplace.add(StringUtils.strip(m.group(), "{}")); - } + Matcher m = Pattern.compile("\\{(.+?)}").matcher(template); + fieldsToReplace = new HashSet<>(); + while (m.find()) { + fieldsToReplace.add(StringUtils.strip(m.group(), "{}")); + } - List rowFields = - fieldsToReplace.stream() - .filter(f -> !WINDOWING_METADATA.contains(f)) - .collect(Collectors.toList()); + List rowFields = + fieldsToReplace.stream() + .filter(f -> !WINDOWING_METADATA.contains(f)) + .collect(Collectors.toList()); - RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation"); - } + RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation"); + } - /** Performs string interpolation on the template using values from the input {@link Row}. */ - public String interpolate(Row row) { - String interpolated = this.template; - for (String field : fieldsToReplace) { - List levels = Splitter.on(".").splitToList(field); + /** Performs string interpolation on the template using values from the input {@link Row}. */ + public String interpolate(Row row) { + String interpolated = this.template; + for (String field : fieldsToReplace) { + List levels = Splitter.on(".").splitToList(field); - Object val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); + Object val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); - interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); - } - return interpolated; + interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); } + return interpolated; + } - /** Like {@link #interpolate(Row)} but also potentially include windowing information. */ - public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) { - String interpolated = this.template; - for (String field : fieldsToReplace) { - Object val; - switch (field) { - case WINDOW: - val = window.toString(); - break; - case PANE_INDEX: - val = paneInfo.getIndex(); - break; - case YYYY: - val = timestamp.getChronology().year().get(timestamp.getMillis()); - break; - case MM: - val = timestamp.getChronology().monthOfYear().get(timestamp.getMillis()); - break; - case DD: - val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis()); - break; - default: - List levels = Splitter.on(".").splitToList(field); - val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); - break; - } + /** Like {@link #interpolate(Row)} but also potentially include windowing information. */ + public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) { + String interpolated = this.template; + for (String field : fieldsToReplace) { + Object val; + switch (field) { + case WINDOW: + val = window.toString(); + break; + case PANE_INDEX: + val = paneInfo.getIndex(); + break; + case YYYY: + val = timestamp.getChronology().year().get(timestamp.getMillis()); + break; + case MM: + val = timestamp.getChronology().monthOfYear().get(timestamp.getMillis()); + break; + case DD: + val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis()); + break; + default: + List levels = Splitter.on(".").splitToList(field); + val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); + break; + } - interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); - } - return interpolated; + interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); } + return interpolated; + } - private @Nullable Object getValue(@Nullable Row row, List fieldLevels, int index) { - if (row == null || fieldLevels.isEmpty()) { - return null; - } - Preconditions.checkState( - index < fieldLevels.size(), - "'%s' only goes %s levels deep. Invalid attempt to check for depth at level %s", - String.join(".", fieldLevels), - fieldLevels.size(), - index); - - String currentField = fieldLevels.get(index); - Object val; - try { - val = row.getValue(currentField); - } catch (IllegalArgumentException e) { - throw new RuntimeException( - String.format( - "Invalid row does not contain field '%s'.", - String.join(".", fieldLevels.subList(0, index + 1))), - e); - } + private @Nullable Object getValue(@Nullable Row row, List fieldLevels, int index) { + if (row == null || fieldLevels.isEmpty()) { + return null; + } + Preconditions.checkState( + index < fieldLevels.size(), + "'%s' only goes %s levels deep. Invalid attempt to check for depth at level %s", + String.join(".", fieldLevels), + fieldLevels.size(), + index); - // base case: we've reached the target - if (index == fieldLevels.size() - 1) { - return val; - } + String currentField = fieldLevels.get(index); + Object val; + try { + val = row.getValue(currentField); + } catch (IllegalArgumentException e) { + throw new RuntimeException( + String.format( + "Invalid row does not contain field '%s'.", + String.join(".", fieldLevels.subList(0, index + 1))), + e); + } - return getValue((Row) val, fieldLevels, ++index); + // base case: we've reached the target + if (index == fieldLevels.size() - 1) { + return val; } -} \ No newline at end of file + + return getValue((Row) val, fieldLevels, ++index); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java index 1be111e009de..bcdf7f6a394d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java @@ -32,160 +32,160 @@ /** Test class for {@link RowStringInterpolator}. */ public class RowStringInterpolatorTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static final Schema DOUBLY_NESTED_ROW_SCHEMA = - Schema.builder() - .addStringField("doubly_nested_str") - .addInt32Field("doubly_nested_int") - .build(); - - private static final Schema NESTED_ROW_SCHEMA = - Schema.builder() - .addStringField("nested_str") - .addInt32Field("nested_int") - .addFloatField("nested_float") - .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) - .build(); - private static final Schema ROW_SCHEMA = - Schema.builder() - .addStringField("str") - .addBooleanField("bool") - .addInt32Field("int") - .addNullableInt32Field("nullable_int") - .addArrayField("arr_int", Schema.FieldType.INT32) - .addRowField("row", NESTED_ROW_SCHEMA) - .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) - .build(); - - @Test - public void testInvalidRowThrowsHelpfulError() { - String template = "foo {str}"; - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build()); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Invalid row does not contain field 'str'."); - - interpolator.interpolate(invalidRow); - } - - @Test - public void testInvalidRowThrowsHelpfulErrorForNestedFields() { - String template = "foo {row.nested_int}"; - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - Schema nestedSchema = Schema.builder().addNullableStringField("xyz").build(); - Row invalidRow = - Row.withSchema(Schema.builder().addNullableRowField("row", nestedSchema).build()) - .addValue(Row.nullRow(nestedSchema)) - .build(); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Invalid row does not contain field 'row.nested_int'."); - - interpolator.interpolate(invalidRow); - } - - @Test - public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() { - String template = "foo {row.nested_row.doubly_nested_int}"; - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - Schema doublyNestedSchema = Schema.builder().addNullableStringField("xyz").build(); - Schema nestedSchema = - Schema.builder().addNullableRowField("nested_row", doublyNestedSchema).build(); - Row invalidRow = - Row.withSchema(Schema.builder().addNullableRowField("row", doublyNestedSchema).build()) - .addValue( - Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build()) - .build(); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Invalid row does not contain field 'row.nested_row.doubly_nested_int'."); - - interpolator.interpolate(invalidRow); - } - - private static final Row ROW = - Row.withSchema(ROW_SCHEMA) - .addValue("str_value") - .addValue(true) - .addValue(123) - .addValue(null) - .addValue(Arrays.asList(1, 2, 3, 4, 5)) - .addValue( - Row.withSchema(NESTED_ROW_SCHEMA) - .addValue("nested_str_value") - .addValue(456) - .addValue(1.234f) - .addValue( - Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) - .addValue("doubly_nested_str_value") - .addValue(789) - .build()) - .build()) - .addValue(null) - .build(); - - @Test - public void testTopLevelInterpolation() { - String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}"; - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - String output = interpolator.interpolate(ROW); - - assertEquals("foo str_value, bar true, baz 123, xyz ", output); - } - - @Test - public void testNestedLevelInterpolation() { - String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}"; - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - String output = interpolator.interpolate(ROW); - - assertEquals("foo str_value, bar nested_str_value, baz 1.234", output); - } - - @Test - public void testDoublyNestedInterpolation() { - String template = - "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}"; - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - String output = interpolator.interpolate(ROW); - - assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output); - } - - @Test - public void testInterpolateWindowingInformation() { - String template = - String.format( - "str: {str}, window: {%s}, pane: {%s}, year: {%s}, month: {%s}, day: {%s}", - RowStringInterpolator.WINDOW, - RowStringInterpolator.PANE_INDEX, - RowStringInterpolator.YYYY, - RowStringInterpolator.MM, - RowStringInterpolator.DD); - - RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - - Instant instant = new DateTime(2024, 8, 28, 12, 0).toInstant(); - - String output = - interpolator.interpolate( - ROW, - GlobalWindow.INSTANCE, - PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0), - instant); - String expected = - String.format( - "str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28", - GlobalWindow.INSTANCE); - - assertEquals(expected, output); - } -} \ No newline at end of file + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final Schema DOUBLY_NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("doubly_nested_str") + .addInt32Field("doubly_nested_int") + .build(); + + private static final Schema NESTED_ROW_SCHEMA = + Schema.builder() + .addStringField("nested_str") + .addInt32Field("nested_int") + .addFloatField("nested_float") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .build(); + private static final Schema ROW_SCHEMA = + Schema.builder() + .addStringField("str") + .addBooleanField("bool") + .addInt32Field("int") + .addNullableInt32Field("nullable_int") + .addArrayField("arr_int", Schema.FieldType.INT32) + .addRowField("row", NESTED_ROW_SCHEMA) + .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .build(); + + @Test + public void testInvalidRowThrowsHelpfulError() { + String template = "foo {str}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Invalid row does not contain field 'str'."); + + interpolator.interpolate(invalidRow); + } + + @Test + public void testInvalidRowThrowsHelpfulErrorForNestedFields() { + String template = "foo {row.nested_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Schema nestedSchema = Schema.builder().addNullableStringField("xyz").build(); + Row invalidRow = + Row.withSchema(Schema.builder().addNullableRowField("row", nestedSchema).build()) + .addValue(Row.nullRow(nestedSchema)) + .build(); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Invalid row does not contain field 'row.nested_int'."); + + interpolator.interpolate(invalidRow); + } + + @Test + public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() { + String template = "foo {row.nested_row.doubly_nested_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Schema doublyNestedSchema = Schema.builder().addNullableStringField("xyz").build(); + Schema nestedSchema = + Schema.builder().addNullableRowField("nested_row", doublyNestedSchema).build(); + Row invalidRow = + Row.withSchema(Schema.builder().addNullableRowField("row", doublyNestedSchema).build()) + .addValue( + Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build()) + .build(); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Invalid row does not contain field 'row.nested_row.doubly_nested_int'."); + + interpolator.interpolate(invalidRow); + } + + private static final Row ROW = + Row.withSchema(ROW_SCHEMA) + .addValue("str_value") + .addValue(true) + .addValue(123) + .addValue(null) + .addValue(Arrays.asList(1, 2, 3, 4, 5)) + .addValue( + Row.withSchema(NESTED_ROW_SCHEMA) + .addValue("nested_str_value") + .addValue(456) + .addValue(1.234f) + .addValue( + Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) + .addValue("doubly_nested_str_value") + .addValue(789) + .build()) + .build()) + .addValue(null) + .build(); + + @Test + public void testTopLevelInterpolation() { + String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + String output = interpolator.interpolate(ROW); + + assertEquals("foo str_value, bar true, baz 123, xyz ", output); + } + + @Test + public void testNestedLevelInterpolation() { + String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + String output = interpolator.interpolate(ROW); + + assertEquals("foo str_value, bar nested_str_value, baz 1.234", output); + } + + @Test + public void testDoublyNestedInterpolation() { + String template = + "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}"; + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + String output = interpolator.interpolate(ROW); + + assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output); + } + + @Test + public void testInterpolateWindowingInformation() { + String template = + String.format( + "str: {str}, window: {%s}, pane: {%s}, year: {%s}, month: {%s}, day: {%s}", + RowStringInterpolator.WINDOW, + RowStringInterpolator.PANE_INDEX, + RowStringInterpolator.YYYY, + RowStringInterpolator.MM, + RowStringInterpolator.DD); + + RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); + + Instant instant = new DateTime(2024, 8, 28, 12, 0).toInstant(); + + String output = + interpolator.interpolate( + ROW, + GlobalWindow.INSTANCE, + PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0), + instant); + String expected = + String.format( + "str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28", + GlobalWindow.INSTANCE); + + assertEquals(expected, output); + } +} From 3a9aee001436ae0977e39a69fc1e9c984811b066 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 15 Sep 2024 16:47:33 -0400 Subject: [PATCH 3/4] address comments --- .../apache/beam/sdk/util/RowStringInterpolator.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java index a79ab89f518a..60195be7d820 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java @@ -78,18 +78,29 @@ public class RowStringInterpolator implements Serializable { private final String template; private final Set fieldsToReplace; + // Represents the string representation of the element's window public static final String WINDOW = "$WINDOW"; public static final String PANE_INDEX = "$PANE_INDEX"; + // Represents the element's pane index public static final String YYYY = "$YYYY"; public static final String MM = "$MM"; public static final String DD = "$DD"; private static final Set WINDOWING_METADATA = Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD); + private static final Pattern TEMPLATE_PATTERN = Pattern.compile("\\{(.+?)}"); + /** + * @param template a String template, potentially with placeholders in the form of curly braces, + * e.g. {@code "my {foo} template"}. During interpolation, these placeholders are replaced + * with values in the Beam Row. For more details and examples, refer to the top-level + * documentation. + * @param rowSchema {@link Row}s used for interpolation are expected to be compatible with this + * {@link Schema}. + */ public RowStringInterpolator(String template, Schema rowSchema) { this.template = template; - Matcher m = Pattern.compile("\\{(.+?)}").matcher(template); + Matcher m = TEMPLATE_PATTERN.matcher(template); fieldsToReplace = new HashSet<>(); while (m.find()) { fieldsToReplace.add(StringUtils.strip(m.group(), "{}")); From 4abf0aadb7fa7eb0aeb93c82bbb8f0bbc5d7bafd Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Sep 2024 15:40:33 -0400 Subject: [PATCH 4/4] address comments --- .../beam/sdk/util/RowStringInterpolator.java | 65 ++++++------------- .../sdk/util/RowStringInterpolatorTest.java | 22 +++---- 2 files changed, 30 insertions(+), 57 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java index 60195be7d820..1f095522dd8b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.io.Serializable; import java.util.HashSet; import java.util.List; @@ -30,9 +32,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.apache.commons.lang3.StringUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -55,7 +55,7 @@ * Row inputRow = {foo: "batch", bar: {baz: "streaming"}, ...}; * * RowStringInterpolator interpolator = new RowStringInterpolator(template, beamSchema); - * String output = interpolator.interpolate(inputRow); + * String output = interpolator.interpolate(inputRow, window, paneInfo, timestamp); * // output --> "unified batch and streaming!" * } * @@ -71,7 +71,7 @@ *
  • $DD: the element timestamp's day *
* - *

For example, your Sting template can look like: + *

For example, your String template can look like: * *

{@code "unified {foo} and {bar} since {$YYYY}-{$MM}!"}
*/ @@ -103,7 +103,7 @@ public RowStringInterpolator(String template, Schema rowSchema) { Matcher m = TEMPLATE_PATTERN.matcher(template); fieldsToReplace = new HashSet<>(); while (m.find()) { - fieldsToReplace.add(StringUtils.strip(m.group(), "{}")); + fieldsToReplace.add(checkStateNotNull(m.group(1))); } List rowFields = @@ -114,20 +114,10 @@ public RowStringInterpolator(String template, Schema rowSchema) { RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation"); } - /** Performs string interpolation on the template using values from the input {@link Row}. */ - public String interpolate(Row row) { - String interpolated = this.template; - for (String field : fieldsToReplace) { - List levels = Splitter.on(".").splitToList(field); - - Object val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); - - interpolated = interpolated.replace("{" + field + "}", String.valueOf(val)); - } - return interpolated; - } - - /** Like {@link #interpolate(Row)} but also potentially include windowing information. */ + /** + * Performs string interpolation on the template using values from the input {@link Row} and its + * windowing metadata. + */ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) { String interpolated = this.template; for (String field : fieldsToReplace) { @@ -149,8 +139,7 @@ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Inst val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis()); break; default: - List levels = Splitter.on(".").splitToList(field); - val = MoreObjects.firstNonNull(getValue(row, levels, 0), ""); + val = MoreObjects.firstNonNull(getValue(row, field), ""); break; } @@ -159,34 +148,18 @@ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Inst return interpolated; } - private @Nullable Object getValue(@Nullable Row row, List fieldLevels, int index) { - if (row == null || fieldLevels.isEmpty()) { + private @Nullable Object getValue(@Nullable Row row, String fieldPath) { + if (row == null) { return null; } - Preconditions.checkState( - index < fieldLevels.size(), - "'%s' only goes %s levels deep. Invalid attempt to check for depth at level %s", - String.join(".", fieldLevels), - fieldLevels.size(), - index); + int dotIndex = fieldPath.indexOf('.'); + String field = dotIndex == -1 ? fieldPath : fieldPath.substring(0, dotIndex); + Preconditions.checkArgument( + row.getSchema().hasField(field), "Invalid row does not contain field '%s'.", field); - String currentField = fieldLevels.get(index); - Object val; - try { - val = row.getValue(currentField); - } catch (IllegalArgumentException e) { - throw new RuntimeException( - String.format( - "Invalid row does not contain field '%s'.", - String.join(".", fieldLevels.subList(0, index + 1))), - e); + if (dotIndex == -1) { + return row.getValue(field); } - - // base case: we've reached the target - if (index == fieldLevels.size() - 1) { - return val; - } - - return getValue((Row) val, fieldLevels, ++index); + return getValue(row.getRow(field), fieldPath.substring(dotIndex + 1)); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java index bcdf7f6a394d..0b1295c38533 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java @@ -65,10 +65,10 @@ public void testInvalidRowThrowsHelpfulError() { Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build()); - thrown.expect(RuntimeException.class); + thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Invalid row does not contain field 'str'."); - interpolator.interpolate(invalidRow); + interpolator.interpolate(invalidRow, null, null, null); } @Test @@ -82,10 +82,10 @@ public void testInvalidRowThrowsHelpfulErrorForNestedFields() { .addValue(Row.nullRow(nestedSchema)) .build(); - thrown.expect(RuntimeException.class); - thrown.expectMessage("Invalid row does not contain field 'row.nested_int'."); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid row does not contain field 'nested_int'."); - interpolator.interpolate(invalidRow); + interpolator.interpolate(invalidRow, null, null, null); } @Test @@ -102,10 +102,10 @@ public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() { Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build()) .build(); - thrown.expect(RuntimeException.class); - thrown.expectMessage("Invalid row does not contain field 'row.nested_row.doubly_nested_int'."); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid row does not contain field 'doubly_nested_int'."); - interpolator.interpolate(invalidRow); + interpolator.interpolate(invalidRow, null, null, null); } private static final Row ROW = @@ -134,7 +134,7 @@ public void testTopLevelInterpolation() { String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}"; RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - String output = interpolator.interpolate(ROW); + String output = interpolator.interpolate(ROW, null, null, null); assertEquals("foo str_value, bar true, baz 123, xyz ", output); } @@ -144,7 +144,7 @@ public void testNestedLevelInterpolation() { String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}"; RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - String output = interpolator.interpolate(ROW); + String output = interpolator.interpolate(ROW, null, null, null); assertEquals("foo str_value, bar nested_str_value, baz 1.234", output); } @@ -155,7 +155,7 @@ public void testDoublyNestedInterpolation() { "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}"; RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - String output = interpolator.interpolate(ROW); + String output = interpolator.interpolate(ROW, null, null, null); assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output); }