From 49a6c18bef7ed39f625e5bc26aae388ea046433c Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Wed, 3 Oct 2018 13:24:03 +0200 Subject: [PATCH] [BEAM-3900] various TypeDescriptor bugfixes --- .../euphoria/core/client/operator/CountByKey.java | 2 +- .../euphoria/core/translate/OperatorTransform.java | 3 +++ .../euphoria/core/translate/ReduceByKeyTranslator.java | 6 +++++- .../extensions/euphoria/core/translate/UnionTranslator.java | 6 ++++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java index de09586a2302..cdc44190787f 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/CountByKey.java @@ -266,7 +266,7 @@ private CountByKey( public Dataset> expand(List> inputs) { return ReduceByKey.named(getName().orElse(null)) .of(Iterables.getOnlyElement(inputs)) - .keyBy(getKeyExtractor()) + .keyBy(getKeyExtractor(), getKeyType().orElse(null)) .valueBy(v -> 1L, TypeDescriptors.longs()) .combineBy(Sums.ofLongs()) .applyIf( diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java index e012b0a0a2ee..707c5b2824a4 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTransform.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.euphoria.core.translate; +import com.google.common.base.Preconditions; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -51,6 +52,8 @@ public static > Dataset operator.getClass().getName()), new OperatorTransform<>(operator, maybeTranslator.get())); + Preconditions.checkState( + output.getTypeDescriptor() != null, "Translator should always return typed PCollection."); return Dataset.of(output, operator); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java index fdce6c8bf869..05bc0703dadd 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java @@ -86,7 +86,11 @@ public PCollection> translate( Combine.perKey(asCombiner(reducer, accumulators, operator.getName().orElse(null)))); @SuppressWarnings("unchecked") final PCollection> casted = (PCollection) combined; - return casted; + return casted.setTypeDescriptor( + operator + .getOutputType() + .orElseThrow( + () -> new IllegalStateException("Unable to infer output type descriptor."))); } return extracted diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java index f3eccef826a4..0c967c289e4a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java @@ -21,15 +21,17 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TypeDescriptor; /** Euphoria to Beam translation of {@link Union} operator. */ class UnionTranslator implements OperatorTranslator> { @Override public PCollection translate(Union operator, PCollectionList inputs) { + final TypeDescriptor outputType = operator.getOutputType().orElse(null); return operator .getName() - .map(name -> inputs.apply(name, Flatten.pCollections())) - .orElseGet(() -> inputs.apply(Flatten.pCollections())); + .map(name -> inputs.apply(name, Flatten.pCollections()).setTypeDescriptor(outputType)) + .orElseGet(() -> inputs.apply(Flatten.pCollections()).setTypeDescriptor(outputType)); } }