Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Flink unified translation #28944

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
025cc3f
Add MetricsContainer support to the Flink sources.
Mar 8, 2023
18beb2c
Add Flink PipelineOption of UseDataStreamForBatch.
Feb 13, 2023
0666521
Modify the unit tests and runner validation tests to cover the
Feb 15, 2023
045271c
Update CHANGES.md to record the notable change.
Mar 7, 2023
dd62a77
Fix compilation errors
jto Sep 22, 2023
ba1076c
Flink unified translator implementation
jto Sep 27, 2023
f9c26e6
Use Flink unified translator for portable executions
jto Oct 11, 2023
6f7bdba
Remove unused Flink translators
jto Oct 11, 2023
977c3f3
Make Reshuffle a Native transform
jto Oct 11, 2023
1ba0e95
Use unified translator for native streaming Pipelines
jto Oct 12, 2023
993b8d8
Fix checkstyle
jto Oct 24, 2023
26577d1
Remove Flink streaming translator
jto Oct 24, 2023
90e12a1
Apply Spotless
jto Oct 24, 2023
dfbac7f
Set execution mode in Unified pipeline context
jto Oct 25, 2023
02f7cbc
Favor composition in FlinkMetricsContainer implementations
jto Sep 27, 2023
86061e1
Revert "Remove unused Flink translators"
jto Oct 26, 2023
c42582d
Restore Portable batch pipeline implementation
jto Oct 26, 2023
e6fd9da
Revert "Use Flink unified translator for portable executions"
jto Oct 26, 2023
ff1fae9
Use unified for portable streaming but not for batch
jto Oct 26, 2023
c41382b
Spotless
jto Oct 26, 2023
6819ab8
Merge branch 'master' into flink_unified_translation
jto Nov 7, 2023
6bf08c3
Don't set parallelism on Impuldr
jto Nov 15, 2023
dc14816
Fix translation order
jto Nov 15, 2023
b014d77
Spotless
jto Nov 17, 2023
eb0ba06
Merge branch 'master' into flink_unified_translation
jto Nov 20, 2023
ef6ba9a
Fix tuple projection translation
jto Nov 21, 2023
7afa8f8
fix ParDoTranslator.getMainInout
jto Nov 21, 2023
53e4e42
Fix NPE with SchemaCoder
jto Nov 21, 2023
1ecef37
Fix imports
jto Nov 21, 2023
e6e7326
Set Impulse parallelism in Streaming mode
jto Nov 21, 2023
988caaa
Spotless
jto Nov 21, 2023
69153e6
Fix windowed singleton side inputs
jto Nov 22, 2023
4ccfc6f
Spotless
jto Nov 22, 2023
8a8d8d7
SpotBug fix
jto Nov 22, 2023
0639e93
Merge branch 'master' into flink_unified_translation
jto Nov 23, 2023
ce1d317
Fix ReadSourcePortableTest
jto Nov 23, 2023
79f7156
Use unified runner for Python Batch
jto Nov 23, 2023
fa99411
Merge branch 'master' into flink_unified_translation
jto Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a
* The Go SDK now requires Go 1.19 to build. ([#25545](https://github.com/apache/beam/pull/25545))
* The Go SDK now has an initial native Go implementation of a portable Beam Runner called Prism. ([#24789](https://github.com/apache/beam/pull/24789))
* For more details and current state see https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism.
* Add `UseDataStreamForBatch` pipeline option to the Flink runner. When it is set to true, Flink runner will run batch
jobs executed with DataStream API. By default the option is set to false, so the batch jobs are still executed
with DataSet API.

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,17 +463,8 @@ public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> applicatio
return getMainOutputTag(getParDoPayload(application));
}

public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> application)
public static TupleTagList getAdditionalOutputTags(RunnerApi.PTransform protoTransform)
throws IOException {
PTransform<?, ?> transform = application.getTransform();
if (transform instanceof ParDo.MultiOutput) {
return ((ParDo.MultiOutput<?, ?>) transform).getAdditionalOutputTags();
}

RunnerApi.PTransform protoTransform =
PTransformTranslation.toProto(
application, SdkComponents.create(application.getPipeline().getOptions()));

ParDoPayload payload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload());
TupleTag<?> mainOutputTag = getMainOutputTag(payload);
Set<String> outputTags =
Expand All @@ -487,6 +478,20 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> ap
return TupleTagList.of(additionalOutputTags);
}

public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> application)
throws IOException {
PTransform<?, ?> transform = application.getTransform();
if (transform instanceof ParDo.MultiOutput) {
return ((ParDo.MultiOutput<?, ?>) transform).getAdditionalOutputTags();
}

RunnerApi.PTransform protoTransform =
PTransformTranslation.toProto(
application, SdkComponents.create(application.getPipeline().getOptions()));

return getAdditionalOutputTags(protoTransform);
}

public static Map<TupleTag<?>, Coder<?>> getOutputCoders(AppliedPTransform<?, ?, ?> application) {
return application.getOutputs().entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;

/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */
public abstract class AbstractStreamOperatorCompat<OutputT>
Expand All @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() {
return getTimeServiceManager()
.map(
manager -> {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
InternalTimeServiceManager<?> tsm = getTimeServiceManagerCompat();
if (tsm instanceof InternalTimeServiceManagerImpl) {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
} else if (tsm instanceof BatchExecutionInternalTimeServiceManager) {
return 0;
} else {
throw new IllegalStateException(
String.format(
"Unknown implementation of InternalTimerServiceManager. %s", tsm));
}
})
.orElse(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;

/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */
public abstract class AbstractStreamOperatorCompat<OutputT>
Expand All @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() {
return getTimeServiceManager()
.map(
manager -> {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
InternalTimeServiceManager<?> tsm = getTimeServiceManagerCompat();
if (tsm instanceof InternalTimeServiceManagerImpl) {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
} else if (tsm instanceof BatchExecutionInternalTimeServiceManager) {
return 0;
} else {
throw new IllegalStateException(
String.format(
"Unknown implementation of InternalTimerServiceManager. %s", tsm));
}
})
.orElse(0);
}
Expand Down
11 changes: 11 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ class ValidatesRunnerConfig {
String name
boolean streaming
boolean checkpointing
boolean useDataStreamForBatch
ArrayList<String> sickbayTests
}

Expand All @@ -255,6 +256,7 @@ def createValidatesRunnerTask(Map m) {
description = "Validates the ${runnerType} runner"
def pipelineOptionsArray = ["--runner=TestFlinkRunner",
"--streaming=${config.streaming}",
"--useDataStreamForBatch=${config.useDataStreamForBatch}",
"--parallelism=2",
]
if (config.checkpointing) {
Expand Down Expand Up @@ -314,12 +316,20 @@ def createValidatesRunnerTask(Map m) {
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate'
// https://github.com/apache/beam/issues/20844
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating'
if (!config.streaming) {
// FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing.
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew'
} else {
// https://github.com/apache/beam/issues/25485
excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState'
}
}
}
}
}

createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests)
createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests)
createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests)
// We specifically have a variant which runs with checkpointing enabled for the
// tests that require it since running a checkpoint variant is significantly
Expand All @@ -332,6 +342,7 @@ tasks.register('validatesRunner') {
group = 'Verification'
description "Validates Flink runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerBatchWithDataStream
dependsOn validatesRunnerStreaming
dependsOn validatesRunnerStreamingCheckpointing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
class CreateStreamingFlinkView<ElemT, ViewT>
public class CreateStreamingFlinkView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
private final PCollectionView<ViewT> view;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,23 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Map;
import java.util.UUID;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.flink.unified.FlinkUnifiedPipelineTranslator;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -48,6 +58,43 @@
})
class FlinkPipelineExecutionEnvironment {

private static class UnifiedTranslatorWrapper extends FlinkPipelineTranslator {
private FlinkUnifiedPipelineTranslator translator;
private FlinkUnifiedPipelineTranslator.UnifiedTranslationContext context;

public UnifiedTranslatorWrapper(
StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) {
FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
String invocationId =
String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());

// The retrieval token is only required by the legacy artifact service, which the Flink runner
// no longer uses.
String retrievalToken =
ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamConstant);

JobInfo jobInfo =
JobInfo.create(
invocationId,
flinkOptions.getJobName(),
retrievalToken,
PipelineOptionsTranslation.toProto(flinkOptions));

translator = FlinkUnifiedPipelineTranslator.createTranslator(isStreaming, false);
context = translator.createTranslationContext(jobInfo, flinkOptions, env, isStreaming, false);
}

@Override
public void translate(Pipeline pipeline) {
// Ensure all outputs of all reads are consumed.
UnconsumedReads.ensureAllReadsConsumed(pipeline);
translator.translate(context, PipelineTranslation.toProto(pipeline));
}
}

private static final Logger LOG =
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);

Expand Down Expand Up @@ -101,13 +148,18 @@ public void translate(Pipeline pipeline) {
prepareFilesToStageForRemoteClusterExecution(options);

FlinkPipelineTranslator translator;
if (options.isStreaming()) {
if (options.isStreaming() || options.getUseDataStreamForBatch()) {
this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options);
if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
LOG.warn(
"UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
}
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options);
translator = new UnifiedTranslatorWrapper(flinkStreamEnv, options, options.isStreaming());
// translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options,
// options.isStreaming());
if (!options.isStreaming()) {
flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
} else {
this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options);
translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
* requiring flink on the classpath (e.g. to use with the direct runner).
*/
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions {
extends PipelineOptions,
ApplicationNameOptions,
StreamingOptions,
FileStagingOptions,
VersionDependentFlinkPipelineOptions {

String AUTO = "[auto]";
String PIPELINED = "PIPELINED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.flink.unified.FlinkUnifiedPipelineTranslator;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
Expand Down Expand Up @@ -92,9 +93,10 @@ public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) thro
FlinkPortablePipelineTranslator<?> translator;
if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {
// TODO: Do we need to inspect for unbounded sources before fusing?
// translator = FlinkUnifiedPipelineTranslator.createTranslator(false, true);
translator = FlinkBatchPortablePipelineTranslator.createTranslator();
} else {
translator = new FlinkStreamingPortablePipelineTranslator();
translator = FlinkUnifiedPipelineTranslator.createTranslator(true, true);
}
return runPipelineWithTranslator(pipeline, jobInfo, translator);
}
Expand Down
Loading
Loading