Skip to content

Commit

Permalink
This closes apache#2053
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Feb 23, 2017
2 parents 00b3958 + 0ac985c commit fbcde4c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 34 deletions.
11 changes: 11 additions & 0 deletions runners/flink/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Test scoped -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class FlinkPipelineExecutionEnvironment {
* a {@link org.apache.flink.api.java.DataSet}
* or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
* */
public void translate(Pipeline pipeline) {
public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
this.flinkBatchEnv = null;
this.flinkStreamEnv = null;

Expand All @@ -92,7 +92,7 @@ public void translate(Pipeline pipeline) {
FlinkPipelineTranslator translator;
if (translationMode == TranslationMode.STREAMING) {
this.flinkStreamEnv = createStreamExecutionEnvironment();
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options);
translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
} else {
this.flinkBatchEnv = createBatchExecutionEnvironment();
translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.DetachedEnvironment;
Expand Down Expand Up @@ -129,7 +126,7 @@ public PipelineResult run(Pipeline pipeline) {
FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);

LOG.info("Translating pipeline to Flink program.");
env.translate(pipeline);
env.translate(this, pipeline);

JobExecutionResult result;
try {
Expand Down Expand Up @@ -165,33 +162,6 @@ public FlinkPipelineOptions getPipelineOptions() {
return options;
}

@Override
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
if (overrides.containsKey(transform.getClass())) {
// It is the responsibility of whoever constructs overrides to ensure this is type safe.
@SuppressWarnings("unchecked")
Class<PTransform<InputT, OutputT>> transformClass =
(Class<PTransform<InputT, OutputT>>) transform.getClass();

@SuppressWarnings("unchecked")
Class<PTransform<InputT, OutputT>> customTransformClass =
(Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());

PTransform<InputT, OutputT> customTransform =
InstanceBuilder.ofType(customTransformClass)
.withArg(FlinkRunner.class, this)
.withArg(transformClass, transform)
.build();

return Pipeline.applyTransform(input, customTransform);
} else {
return super.apply(transform, input);
}
}

/////////////////////////////////////////////////////////////////////////////

@Override
public String toString() {
return "FlinkRunner#" + hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@
*/
package org.apache.beam.runners.flink;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.PValue;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
Expand All @@ -40,8 +50,54 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {

private int depth = 0;

public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
private FlinkRunner flinkRunner;

public FlinkStreamingPipelineTranslator(
FlinkRunner flinkRunner,
StreamExecutionEnvironment env,
PipelineOptions options) {
this.streamingContext = new FlinkStreamingTranslationContext(env, options);
this.flinkRunner = flinkRunner;
}

@Override
public void translate(Pipeline pipeline) {
Map<PTransformMatcher, PTransformOverrideFactory> transformOverrides =
ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
.put(
PTransformMatchers.classEqualTo(View.AsIterable.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))
.put(
PTransformMatchers.classEqualTo(View.AsList.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner))
.put(
PTransformMatchers.classEqualTo(View.AsMap.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner))
.put(
PTransformMatchers.classEqualTo(View.AsMultimap.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner))
.put(
PTransformMatchers.classEqualTo(View.AsSingleton.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))
// this has to be last since the ViewAsSingleton override
// can expand to a Combine.GloballyAsSingletonView
.put(
PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
flinkRunner))
.build();

for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
transformOverrides.entrySet()) {
pipeline.replace(override.getKey(), override.getValue());
}
super.translate(pipeline);
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -147,4 +203,28 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
return true;
}
}

private static class ReflectiveOneToOneOverrideFactory<
InputT extends PValue,
OutputT extends PValue,
TransformT extends PTransform<InputT, OutputT>>
extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
private final Class<PTransform<InputT, OutputT>> replacement;
private final FlinkRunner runner;

private ReflectiveOneToOneOverrideFactory(
Class<PTransform<InputT, OutputT>> replacement, FlinkRunner runner) {
this.replacement = replacement;
this.runner = runner;
}

@Override
public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) {
return InstanceBuilder.ofType(replacement)
.withArg(FlinkRunner.class, runner)
.withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform)
.build();
}
}

}

0 comments on commit fbcde4c

Please sign in to comment.