Skip to content

Commit

Permalink
[BEAM-621] Add MapKeys, MapValues transforms (#14273)
Browse files Browse the repository at this point in the history
* [BEAM-621] Add MapKeys and MapValues PTransforms

* [BEAM-621] Add MapToKeys and MapToValues PTransforms

* [BEAM-621] Fix code style

* [BEAM-621] Set Coder in tests

* [BEAM-621] Fix assertion in testMapValues

* [BEAM-621] Fix type variable names according to the CheckStyle

* [BEAM-621] Add Apache License header in tests

* [BEAM-621] Fix violations according to Spotless

* [BEAM-621] Fix violations according to Spotless

* [BEAM-621] Fix violations according to Spotless

* [BEAM-621] Remove SuppressWarnings in tests

* [BEAM-621] Use ImmutableList.of() instead of double-brace initialization

* [BEAM-621] Remove redundant classes, fix javadoc

* [BEAM-621] Handle exceptions in MapKeys and MapValues

* [BEAM-621] Fix CheckStyle

* [BEAM-621] Fix Spotless

* [BEAM-621] Fix nullness issues
  • Loading branch information
Amar3tto authored May 28, 2021
1 parent 79d4c03 commit dd15d59
Show file tree
Hide file tree
Showing 5 changed files with 640 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;

/**
* {@code MapKeys} maps a {@code SerializableFunction<K1,K2>} over keys of a {@code
* PCollection<KV<K1,V>>} and returns a {@code PCollection<KV<K2, V>>}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<KV<Integer, String>> input = ...;
* PCollection<KV<Double, String> output =
* input.apply(MapKeys.into(TypeDescriptors.doubles()).via(Integer::doubleValue));
* }</pre>
*
* <p>See also {@link MapValues}.
*
* @param <K1> the type of the keys in the input {@code PCollection}
* @param <K2> the type of the keys in the output {@code PCollection}
*/
public class MapKeys<K1, K2, V> extends PTransform<PCollection<KV<K1, V>>, PCollection<KV<K2, V>>> {

private final transient TypeDescriptor<K2> outputType;
private final @Nullable Contextful<Fn<KV<K1, V>, KV<K2, V>>> fn;

/**
* Returns a {@code MapKeys<K1, K2, V>} {@code PTransform} for a {@code ProcessFunction<NewK1,
* K2>} with predefined {@link #outputType}.
*
* @param <NewKeyT> the type of the keys in the input {@code PCollection}
* @param <NewValueT> the type of the values in the input and output {@code PCollection}s
*/
public <NewValueT, NewKeyT> MapKeys<NewKeyT, K2, NewValueT> via(
SerializableFunction<NewKeyT, K2> fn) {
return new MapKeys<>(
Contextful.fn(
((element, c) -> KV.of(fn.apply(element.getKey()), element.getValue())),
Requirements.empty()),
outputType);
}

/**
* Returns a new {@link MapKeys} transform with the given type descriptor for the output type, but
* the mapping function yet to be specified using {@link #via(SerializableFunction)}.
*/
public static <K2> MapKeys<?, K2, ?> into(final TypeDescriptor<K2> outputType) {
return new MapKeys<>(null, outputType);
}

private MapKeys(
@Nullable Contextful<Fn<KV<K1, V>, KV<K2, V>>> fn, TypeDescriptor<K2> outputType) {
this.fn = fn;
this.outputType = outputType;
}

/**
* Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while
* mapping elements, with the given type descriptor used for the failure collection but the
* exception handler yet to be specified using {@link
* SimpleMapWithFailures#exceptionsVia(ProcessFunction)}.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<KV<Integer, String>>, String> result =
* input.apply(
* MapKeys.into(TypeDescriptors.integers())
* .<String, String>via(word -> 1 / word.length) // Could throw ArithmeticException
* .exceptionsInto(TypeDescriptors.strings())
* .exceptionsVia(ee -> ee.exception().getMessage()));
* PCollection<KV<Integer, String>> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
@RequiresNonNull("fn")
public <FailureT> SimpleMapWithFailures<KV<K1, V>, KV<K2, V>, FailureT> exceptionsInto(
TypeDescriptor<FailureT> failureTypeDescriptor) {
return new SimpleMapWithFailures<>(
"MapKeysWithFailures", fn, getKvTypeDescriptor(), null, failureTypeDescriptor);
}

/**
* Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while
* mapping elements, passing the raised exception instance and the input element being processed
* through the given {@code exceptionHandler} and emitting the result to a failure collection.
*
* <p>This method takes advantage of the type information provided by {@link InferableFunction},
* meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<KV<Integer, String>>, String> result =
* input.apply(
* MapKeys.into(TypeDescriptors.integers())
* .<String, String>via(word -> 1 / word.length) // Could throw ArithmeticException
* .exceptionsVia(
* new InferableFunction<ExceptionElement<KV<String, String>>, String>() {
* @Override
* public String apply(ExceptionElement<KV<String, String>> input) {
* return input.exception().getMessage();
* }
* }));
* PCollection<KV<Integer, String>> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
@RequiresNonNull("fn")
public <FailureT> SimpleMapWithFailures<KV<K1, V>, KV<K2, V>, FailureT> exceptionsVia(
InferableFunction<ExceptionElement<KV<K1, V>>, FailureT> exceptionHandler) {
return new SimpleMapWithFailures<>(
"MapKeysWithFailures",
fn,
getKvTypeDescriptor(),
exceptionHandler,
exceptionHandler.getOutputTypeDescriptor());
}

@Override
public PCollection<KV<K2, V>> expand(PCollection<KV<K1, V>> input) {
return input.apply(
"MapKeys",
MapElements.into(getKvTypeDescriptor())
.via(checkNotNull(fn, "Must specify a function on MapKeys using .via()")));
}

private TypeDescriptor<KV<K2, V>> getKvTypeDescriptor() {
return new TypeDescriptor<KV<K2, V>>() {}.where(new TypeParameter<K2>() {}, outputType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;

/**
* {@code MapValues} maps a {@code SerializableFunction<V1,V2>} over values of a {@code
* PCollection<KV<K,V1>>} and returns a {@code PCollection<KV<K, V2>>}.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<KV<String, Integer>> input = ...;
* PCollection<KV<String, Double> output =
* input.apply(MapValues.into(TypeDescriptors.doubles()).via(Integer::doubleValue));
* }</pre>
*
* <p>See also {@link MapKeys}.
*
* @param <V1> the type of the values in the input {@code PCollection}
* @param <V2> the type of the elements in the output {@code PCollection}
*/
public class MapValues<K, V1, V2>
extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, V2>>> {

private final transient TypeDescriptor<V2> outputType;
private final @Nullable Contextful<Fn<KV<K, V1>, KV<K, V2>>> fn;

/**
* Returns a {@link MapValues} transform for a {@code ProcessFunction<NewV1, V2>} with predefined
* {@link #outputType}.
*
* @param <NewKeyT> the type of the keys in the input and output {@code PCollection}s
* @param <NewValueT> the type of the values in the input {@code PCollection}
*/
public <NewKeyT, NewValueT> MapValues<NewKeyT, NewValueT, V2> via(
SerializableFunction<NewValueT, V2> fn) {
return new MapValues<>(
Contextful.fn(
((element, c) -> KV.of(element.getKey(), fn.apply(element.getValue()))),
Requirements.empty()),
outputType);
}

/**
* Returns a new {@link MapValues} transform with the given type descriptor for the output type,
* but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
*/
public static <V2> MapValues<?, ?, V2> into(final TypeDescriptor<V2> outputType) {
return new MapValues<>(null, outputType);
}

private MapValues(
@Nullable Contextful<Fn<KV<K, V1>, KV<K, V2>>> fn, TypeDescriptor<V2> outputType) {
this.fn = fn;
this.outputType = outputType;
}

/**
* Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while
* mapping elements, with the given type descriptor used for the failure collection but the
* exception handler yet to be specified using {@link
* SimpleMapWithFailures#exceptionsVia(ProcessFunction)}.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<KV<String, Integer>>, String> result =
* input.apply(
* MapValues.into(TypeDescriptors.integers())
* .<String, String>via(word -> 1 / word.length) // Could throw ArithmeticException
* .exceptionsInto(TypeDescriptors.strings())
* .exceptionsVia(ee -> ee.exception().getMessage()));
* PCollection<KV<String, Integer>> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
@RequiresNonNull("fn")
public <FailureT> SimpleMapWithFailures<KV<K, V1>, KV<K, V2>, FailureT> exceptionsInto(
TypeDescriptor<FailureT> failureTypeDescriptor) {
return new SimpleMapWithFailures<>(
"MapValuesWithFailures", fn, getKvTypeDescriptor(), null, failureTypeDescriptor);
}

/**
* Returns a new {@link SimpleMapWithFailures} transform that catches exceptions raised while
* mapping elements, passing the raised exception instance and the input element being processed
* through the given {@code exceptionHandler} and emitting the result to a failure collection.
*
* <p>This method takes advantage of the type information provided by {@link InferableFunction},
* meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<KV<String, Integer>>, String> result =
* input.apply(
* MapValues.into(TypeDescriptors.integers())
* .<String, String>via(word -> 1 / word.length) // Could throw ArithmeticException
* .exceptionsVia(
* new InferableFunction<ExceptionElement<KV<String, String>>, String>() {
* @Override
* public String apply(ExceptionElement<KV<String, String>> input) {
* return input.exception().getMessage();
* }
* }));
* PCollection<KV<String, Integer>> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
@RequiresNonNull("fn")
public <FailureT> SimpleMapWithFailures<KV<K, V1>, KV<K, V2>, FailureT> exceptionsVia(
InferableFunction<ExceptionElement<KV<K, V1>>, FailureT> exceptionHandler) {
return new SimpleMapWithFailures<>(
"MapValuesWithFailures",
fn,
getKvTypeDescriptor(),
exceptionHandler,
exceptionHandler.getOutputTypeDescriptor());
}

@Override
public PCollection<KV<K, V2>> expand(PCollection<KV<K, V1>> input) {
return input.apply(
"MapValues",
MapElements.into(getKvTypeDescriptor())
.via(checkNotNull(fn, "Must specify a function on MapValues using .via()")));
}

private TypeDescriptor<KV<K, V2>> getKvTypeDescriptor() {
return new TypeDescriptor<KV<K, V2>>() {}.where(new TypeParameter<V2>() {}, outputType);
}
}
Loading

0 comments on commit dd15d59

Please sign in to comment.