Skip to content

Commit

Permalink
Merge pull request #43 from seznam/simunek/typeDescriptorFixes
Browse files Browse the repository at this point in the history
[BEAM-3900] TypeDescriptor fixes
  • Loading branch information
VaclavPlajt authored Oct 4, 2018
2 parents 5875038 + 49a6c18 commit eb75b14
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private CountByKey(
public Dataset<KV<KeyT, Long>> expand(List<Dataset<InputT>> inputs) {
return ReduceByKey.named(getName().orElse(null))
.of(Iterables.getOnlyElement(inputs))
.keyBy(getKeyExtractor())
.keyBy(getKeyExtractor(), getKeyType().orElse(null))
.valueBy(v -> 1L, TypeDescriptors.longs())
.combineBy(Sums.ofLongs())
.applyIf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.euphoria.core.translate;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -51,6 +52,8 @@ public static <InputT, OutputT, OperatorT extends Operator<OutputT>> Dataset<Out
inputList.apply(
operator.getName().orElseGet(() -> operator.getClass().getName()),
new OperatorTransform<>(operator, maybeTranslator.get()));
Preconditions.checkState(
output.getTypeDescriptor() != null, "Translator should always return typed PCollection.");
return Dataset.of(output, operator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ public PCollection<KV<KeyT, OutputT>> translate(
Combine.perKey(asCombiner(reducer, accumulators, operator.getName().orElse(null))));
@SuppressWarnings("unchecked")
final PCollection<KV<KeyT, OutputT>> casted = (PCollection) combined;
return casted;
return casted.setTypeDescriptor(
operator
.getOutputType()
.orElseThrow(
() -> new IllegalStateException("Unable to infer output type descriptor.")));
}

return extracted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;

/** Euphoria to Beam translation of {@link Union} operator. */
class UnionTranslator<InputT> implements OperatorTranslator<InputT, InputT, Union<InputT>> {

@Override
public PCollection<InputT> translate(Union<InputT> operator, PCollectionList<InputT> inputs) {
final TypeDescriptor<InputT> outputType = operator.getOutputType().orElse(null);
return operator
.getName()
.map(name -> inputs.apply(name, Flatten.pCollections()))
.orElseGet(() -> inputs.apply(Flatten.pCollections()));
.map(name -> inputs.apply(name, Flatten.pCollections()).setTypeDescriptor(outputType))
.orElseGet(() -> inputs.apply(Flatten.pCollections()).setTypeDescriptor(outputType));
}
}

0 comments on commit eb75b14

Please sign in to comment.