From dd15d59fc82fb50d3cd511370bd78034c6a99074 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Sat, 29 May 2021 01:45:09 +0300 Subject: [PATCH] [BEAM-621] Add MapKeys, MapValues transforms (#14273) * [BEAM-621] Add MapKeys and MapValues PTransforms * [BEAM-621] Add MapToKeys and MapToValues PTransforms * [BEAM-621] Fix code style * [BEAM-621] Set Coder in tests * [BEAM-621] Fix assertion in testMapValues * [BEAM-621] Fix type variable names according to the CheckStyle * [BEAM-621] Add Apache License header in tests * [BEAM-621] Fix violations according to Spotless * [BEAM-621] Fix violations according to Spotless * [BEAM-621] Fix violations according to Spotless * [BEAM-621] Remove SuppressWarnings in tests * [BEAM-621] Use ImmutableList.of() instead of double-brace initialization * [BEAM-621] Remove redundant classes, fix javadoc * [BEAM-621] Handle exceptions in MapKeys and MapValues * [BEAM-621] Fix CheckStyle * [BEAM-621] Fix Spotless * [BEAM-621] Fix nullness issues --- .../apache/beam/sdk/transforms/MapKeys.java | 163 +++++++++++++++++ .../apache/beam/sdk/transforms/MapValues.java | 164 ++++++++++++++++++ .../sdk/transforms/SimpleMapWithFailures.java | 75 ++++++++ .../beam/sdk/transforms/MapKeysTest.java | 119 +++++++++++++ .../beam/sdk/transforms/MapValuesTest.java | 119 +++++++++++++ 5 files changed, 640 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java new file mode 100644 index 000000000000..579bbc6663e7 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.transforms.Contextful.Fn; +import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.RequiresNonNull; + +/** + * {@code MapKeys} maps a {@code SerializableFunction} over keys of a {@code + * PCollection>} and returns a {@code PCollection>}. + * + *

Example of use: + * + *

{@code
+ * PCollection> input = ...;
+ * PCollection output =
+ *      input.apply(MapKeys.into(TypeDescriptors.doubles()).via(Integer::doubleValue));
+ * }
+ * + *

See also {@link MapValues}. + * + * @param the type of the keys in the input {@code PCollection} + * @param the type of the keys in the output {@code PCollection} + */ +public class MapKeys extends PTransform>, PCollection>> { + + private final transient TypeDescriptor outputType; + private final @Nullable Contextful, KV>> fn; + + /** + * Returns a {@code MapKeys} {@code PTransform} for a {@code ProcessFunction} with predefined {@link #outputType}. + * + * @param the type of the keys in the input {@code PCollection} + * @param the type of the values in the input and output {@code PCollection}s + */ + public MapKeys via( + SerializableFunction fn) { + return new MapKeys<>( + Contextful.fn( + ((element, c) -> KV.of(fn.apply(element.getKey()), element.getValue())), + Requirements.empty()), + outputType); + } + + /** + * Returns a new {@link MapKeys} transform with the given type descriptor for the output type, but + * the mapping function yet to be specified using {@link #via(SerializableFunction)}. + */ + public static MapKeys into(final TypeDescriptor outputType) { + return new MapKeys<>(null, outputType); + } + + private MapKeys( + @Nullable Contextful, KV>> fn, TypeDescriptor outputType) { + this.fn = fn; + this.outputType = outputType; + } + + /** + * Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while + * mapping elements, with the given type descriptor used for the failure collection but the + * exception handler yet to be specified using {@link + * SimpleMapWithFailures#exceptionsVia(ProcessFunction)}. + * + *

See {@link WithFailures} documentation for usage patterns of the returned {@link + * WithFailures.Result}. + * + *

Example usage: + * + *

{@code
+   * Result>, String> result =
+   *         input.apply(
+   *             MapKeys.into(TypeDescriptors.integers())
+   *                 .via(word -> 1 / word.length)  // Could throw ArithmeticException
+   *                 .exceptionsInto(TypeDescriptors.strings())
+   *                 .exceptionsVia(ee -> ee.exception().getMessage()));
+   * PCollection> output = result.output();
+   * PCollection failures = result.failures();
+   * }
+ */ + @RequiresNonNull("fn") + public SimpleMapWithFailures, KV, FailureT> exceptionsInto( + TypeDescriptor failureTypeDescriptor) { + return new SimpleMapWithFailures<>( + "MapKeysWithFailures", fn, getKvTypeDescriptor(), null, failureTypeDescriptor); + } + + /** + * Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while + * mapping elements, passing the raised exception instance and the input element being processed + * through the given {@code exceptionHandler} and emitting the result to a failure collection. + * + *

This method takes advantage of the type information provided by {@link InferableFunction}, + * meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary. + * + *

See {@link WithFailures} documentation for usage patterns of the returned {@link + * WithFailures.Result}. + * + *

Example usage: + * + *

{@code
+   * Result>, String> result =
+   *         input.apply(
+   *             MapKeys.into(TypeDescriptors.integers())
+   *                 .via(word -> 1 / word.length)  // Could throw ArithmeticException
+   *                 .exceptionsVia(
+   *                     new InferableFunction>, String>() {
+   *                       @Override
+   *                       public String apply(ExceptionElement> input) {
+   *                         return input.exception().getMessage();
+   *                       }
+   *                     }));
+   * PCollection> output = result.output();
+   * PCollection failures = result.failures();
+   * }
+ */ + @RequiresNonNull("fn") + public SimpleMapWithFailures, KV, FailureT> exceptionsVia( + InferableFunction>, FailureT> exceptionHandler) { + return new SimpleMapWithFailures<>( + "MapKeysWithFailures", + fn, + getKvTypeDescriptor(), + exceptionHandler, + exceptionHandler.getOutputTypeDescriptor()); + } + + @Override + public PCollection> expand(PCollection> input) { + return input.apply( + "MapKeys", + MapElements.into(getKvTypeDescriptor()) + .via(checkNotNull(fn, "Must specify a function on MapKeys using .via()"))); + } + + private TypeDescriptor> getKvTypeDescriptor() { + return new TypeDescriptor>() {}.where(new TypeParameter() {}, outputType); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java new file mode 100644 index 000000000000..ae1fe8ea6a45 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.transforms.Contextful.Fn; +import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.RequiresNonNull; + +/** + * {@code MapValues} maps a {@code SerializableFunction} over values of a {@code + * PCollection>} and returns a {@code PCollection>}. + * + *

Example of use: + * + *

{@code
+ * PCollection> input = ...;
+ * PCollection output =
+ *      input.apply(MapValues.into(TypeDescriptors.doubles()).via(Integer::doubleValue));
+ * }
+ * + *

See also {@link MapKeys}. + * + * @param the type of the values in the input {@code PCollection} + * @param the type of the elements in the output {@code PCollection} + */ +public class MapValues + extends PTransform>, PCollection>> { + + private final transient TypeDescriptor outputType; + private final @Nullable Contextful, KV>> fn; + + /** + * Returns a {@link MapValues} transform for a {@code ProcessFunction} with predefined + * {@link #outputType}. + * + * @param the type of the keys in the input and output {@code PCollection}s + * @param the type of the values in the input {@code PCollection} + */ + public MapValues via( + SerializableFunction fn) { + return new MapValues<>( + Contextful.fn( + ((element, c) -> KV.of(element.getKey(), fn.apply(element.getValue()))), + Requirements.empty()), + outputType); + } + + /** + * Returns a new {@link MapValues} transform with the given type descriptor for the output type, + * but the mapping function yet to be specified using {@link #via(SerializableFunction)}. + */ + public static MapValues into(final TypeDescriptor outputType) { + return new MapValues<>(null, outputType); + } + + private MapValues( + @Nullable Contextful, KV>> fn, TypeDescriptor outputType) { + this.fn = fn; + this.outputType = outputType; + } + + /** + * Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while + * mapping elements, with the given type descriptor used for the failure collection but the + * exception handler yet to be specified using {@link + * SimpleMapWithFailures#exceptionsVia(ProcessFunction)}. + * + *

See {@link WithFailures} documentation for usage patterns of the returned {@link + * WithFailures.Result}. + * + *

Example usage: + * + *

{@code
+   * Result>, String> result =
+   *         input.apply(
+   *             MapValues.into(TypeDescriptors.integers())
+   *                 .via(word -> 1 / word.length)  // Could throw ArithmeticException
+   *                 .exceptionsInto(TypeDescriptors.strings())
+   *                 .exceptionsVia(ee -> ee.exception().getMessage()));
+   * PCollection> output = result.output();
+   * PCollection failures = result.failures();
+   * }
+ */ + @RequiresNonNull("fn") + public SimpleMapWithFailures, KV, FailureT> exceptionsInto( + TypeDescriptor failureTypeDescriptor) { + return new SimpleMapWithFailures<>( + "MapValuesWithFailures", fn, getKvTypeDescriptor(), null, failureTypeDescriptor); + } + + /** + * Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while + * mapping elements, passing the raised exception instance and the input element being processed + * through the given {@code exceptionHandler} and emitting the result to a failure collection. + * + *

This method takes advantage of the type information provided by {@link InferableFunction}, + * meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary. + * + *

See {@link WithFailures} documentation for usage patterns of the returned {@link + * WithFailures.Result}. + * + *

Example usage: + * + *

{@code
+   * Result>, String> result =
+   *         input.apply(
+   *             MapValues.into(TypeDescriptors.integers())
+   *                 .via(word -> 1 / word.length)  // Could throw ArithmeticException
+   *                 .exceptionsVia(
+   *                     new InferableFunction>, String>() {
+   *                       @Override
+   *                       public String apply(ExceptionElement> input) {
+   *                         return input.exception().getMessage();
+   *                       }
+   *                     }));
+   * PCollection> output = result.output();
+   * PCollection failures = result.failures();
+   * }
+ */ + @RequiresNonNull("fn") + public SimpleMapWithFailures, KV, FailureT> exceptionsVia( + InferableFunction>, FailureT> exceptionHandler) { + return new SimpleMapWithFailures<>( + "MapValuesWithFailures", + fn, + getKvTypeDescriptor(), + exceptionHandler, + exceptionHandler.getOutputTypeDescriptor()); + } + + @Override + public PCollection> expand(PCollection> input) { + return input.apply( + "MapValues", + MapElements.into(getKvTypeDescriptor()) + .via(checkNotNull(fn, "Must specify a function on MapValues using .via()"))); + } + + private TypeDescriptor> getKvTypeDescriptor() { + return new TypeDescriptor>() {}.where(new TypeParameter() {}, outputType); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java new file mode 100644 index 000000000000..8d58f36eff9b --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.transforms.Contextful.Fn; +import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A {@code PTransform} that adds exception handling to {@link MapKeys} and {@link MapValues} using + * {@link MapElements.MapWithFailures}. + */ +class SimpleMapWithFailures + extends PTransform, WithFailures.Result, FailureT>> { + + private final transient TypeDescriptor outputType; + private final Contextful> fn; + private final transient TypeDescriptor failureType; + private final @Nullable ProcessFunction, FailureT> exceptionHandler; + private final String transformName; + + SimpleMapWithFailures( + String transformName, + Contextful> fn, + TypeDescriptor outputType, + @Nullable ProcessFunction, FailureT> exceptionHandler, + TypeDescriptor failureType) { + this.transformName = transformName; + this.fn = fn; + this.outputType = outputType; + this.exceptionHandler = exceptionHandler; + this.failureType = failureType; + } + + @Override + public WithFailures.Result, FailureT> expand(PCollection input) { + if (exceptionHandler == null) { + throw new NullPointerException(".exceptionsVia() is required"); + } + return input.apply( + transformName, + MapElements.into(outputType) + .via(fn) + .exceptionsInto(failureType) + .exceptionsVia(exceptionHandler)); + } + + /** + * Returns a {@code PTransform} that catches exceptions raised while mapping elements, passing the + * raised exception instance and the input element being processed through the given {@code + * exceptionHandler} and emitting the result to a failure collection. + */ + public SimpleMapWithFailures exceptionsVia( + ProcessFunction, FailureT> exceptionHandler) { + return new SimpleMapWithFailures<>( + transformName, fn, outputType, exceptionHandler, failureType); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java new file mode 100644 index 000000000000..659388b133ea --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link MapKeys} transform. */ +@RunWith(JUnit4.class) +public class MapKeysTest { + + private static final List> TABLE = + ImmutableList.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "none")); + private static final List> WORDS_TABLE = + ImmutableList.of( + KV.of("one", "Length = 3"), KV.of("three", "Length = 4"), KV.of("", "Length = 0")); + + private static final List> EMPTY_TABLE = new ArrayList<>(); + public static final String EXPECTED_FAILURE_MESSAGE = "/ by zero"; + + @Rule public final TestPipeline p = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testMapKeysInto() { + + PCollection> input = + p.apply( + Create.of(TABLE) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); + + PCollection> output = + input + .apply( + MapKeys.into(TypeDescriptors.doubles()) + .via((SerializableFunction) input1 -> input1 * 2d)) + .setCoder(KvCoder.of(DoubleCoder.of(), StringUtf8Coder.of())); + + PAssert.that(output) + .containsInAnyOrder( + ImmutableList.of(KV.of(2.0d, "one"), KV.of(4.0d, "two"), KV.of(6.0d, "none"))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMapKeysWithFailures() { + + PCollection> input = + p.apply( + Create.of(WORDS_TABLE) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))); + + WithFailures.Result>, String> result = + input.apply( + MapKeys.into(TypeDescriptors.integers()) + .via(word -> 1 / word.length()) + .exceptionsInto(TypeDescriptors.strings()) + .exceptionsVia(ee -> ee.exception().getMessage())); + result.output().setCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())); + + PAssert.that(result.output()) + .containsInAnyOrder(ImmutableList.of(KV.of(0, "Length = 3"), KV.of(0, "Length = 4"))); + PAssert.that(result.failures()).containsInAnyOrder(EXPECTED_FAILURE_MESSAGE); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMapKeysEmpty() { + + PCollection> input = + p.apply( + Create.of(EMPTY_TABLE) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); + + PCollection> output = + input + .apply(MapKeys.into(TypeDescriptors.doubles()).via(Integer::doubleValue)) + .setCoder(KvCoder.of(DoubleCoder.of(), StringUtf8Coder.of())); + + PAssert.that(output).empty(); + + p.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java new file mode 100644 index 000000000000..e32d6e9ca600 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link MapValues} transform. */ +@RunWith(JUnit4.class) +public class MapValuesTest { + + private static final List> TABLE = + ImmutableList.of(KV.of("one", 1), KV.of("two", 2), KV.of("dup", 2)); + private static final List> WORDS_TABLE = + ImmutableList.of( + KV.of("Length = 3", "one"), KV.of("Length = 4", "three"), KV.of("Length = 0", "")); + + private static final List> EMPTY_TABLE = new ArrayList<>(); + public static final String EXPECTED_FAILURE_MESSAGE = "/ by zero"; + + @Rule public final TestPipeline p = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testMapValuesInto() { + + PCollection> input = + p.apply( + Create.of(TABLE) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + PCollection> output = + input + .apply( + MapValues.into(TypeDescriptors.doubles()) + .via((SerializableFunction) input1 -> input1 * 2d)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of())); + + PAssert.that(output) + .containsInAnyOrder( + ImmutableList.of(KV.of("one", 2.0d), KV.of("two", 4.0d), KV.of("dup", 4.0d))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMapValuesWithFailures() { + + PCollection> input = + p.apply( + Create.of(WORDS_TABLE) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))); + + WithFailures.Result>, String> result = + input.apply( + MapValues.into(TypeDescriptors.integers()) + .via(word -> 1 / word.length()) + .exceptionsInto(TypeDescriptors.strings()) + .exceptionsVia(ee -> ee.exception().getMessage())); + result.output().setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); + + PAssert.that(result.output()) + .containsInAnyOrder(ImmutableList.of(KV.of("Length = 3", 0), KV.of("Length = 4", 0))); + PAssert.that(result.failures()).containsInAnyOrder(EXPECTED_FAILURE_MESSAGE); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMapValuesEmpty() { + + PCollection> input = + p.apply( + Create.of(EMPTY_TABLE) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + PCollection> output = + input + .apply(MapValues.into(TypeDescriptors.doubles()).via(Integer::doubleValue)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of())); + + PAssert.that(output).empty(); + + p.run(); + } +}