diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index a5ad5e9b1f1d..4dd47a82e66c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -110,7 +110,10 @@ public void mapPartition(Iterable> input, // TODO: Support multiple output receivers and redirect them properly. Map>> outputCoders = processBundleDescriptor.getOutputTargetCoders(); - BeamFnApi.Target outputTarget = Iterables.getOnlyElement(outputCoders.keySet()); + BeamFnApi.Target outputTarget = null; + if (outputCoders.size() > 0) { + outputTarget = Iterables.getOnlyElement(outputCoders.keySet()); + } Coder outputCoder = Iterables.getOnlyElement(outputCoders.values()); SdkHarnessClient.RemoteOutputReceiver> mainOutputReceiver = new SdkHarnessClient.RemoteOutputReceiver>() { @@ -131,8 +134,15 @@ public void accept(WindowedValue input) throws Exception { }; } }; - SdkHarnessClient.ActiveBundle bundle = processor.newBundle( - ImmutableMap.of(outputTarget, mainOutputReceiver)); + Map> receiverMap; + if (outputTarget == null) { + receiverMap = ImmutableMap.of(outputTarget, mainOutputReceiver); + } else { + receiverMap = ImmutableMap.of(); + } + + SdkHarnessClient.ActiveBundle bundle = processor.newBundle(receiverMap); try (CloseableFnDataReceiver> inputReceiver = bundle.getInputReceiver()) { for (WindowedValue value : input) { inputReceiver.accept(value);