Skip to content

Commit

Permalink
Use Flink unified translator for portable executions
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Oct 25, 2023
1 parent ba1076c commit f9c26e6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.flink;

import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import org.apache.beam.runners.flink.unified.FlinkUnifiedPipelineTranslator;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;

import java.util.List;
Expand Down Expand Up @@ -92,10 +93,11 @@ public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) thro
FlinkPortablePipelineTranslator<?> translator;
if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {
// TODO: Do we need to inspect for unbounded sources before fusing?
translator = FlinkBatchPortablePipelineTranslator.createTranslator();
translator = FlinkUnifiedPipelineTranslator.createTranslator(false, true);
} else {
translator = new FlinkStreamingPortablePipelineTranslator();
translator = FlinkUnifiedPipelineTranslator.createTranslator(true, true);
}

return runPipelineWithTranslator(pipeline, jobInfo, translator);
}

Expand Down Expand Up @@ -129,6 +131,7 @@ PortablePipelineResult runPipelineWithTranslator(
translator.translate(
translator.createTranslationContext(jobInfo, pipelineOptions, confDir, filesToStage),
fusedPipeline);

final JobExecutionResult result = executor.execute(pipelineOptions.getJobName());

return createPortablePipelineResult(result, pipelineOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.flink;

import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Sample;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -111,8 +114,8 @@ public void testExecution() throws Exception {
Pipeline p = Pipeline.create(options);
PCollection<Long> result =
p.apply(Read.from(new Source(10)))
// FIXME: the test fails without this
.apply(Window.into(FixedWindows.of(Duration.millis(1))));
// FIXME: the test fails without this
.apply(Window.into(FixedWindows.of(Duration.millis(1))));

PAssert.that(result)
.containsInAnyOrder(ImmutableList.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
Expand Down Expand Up @@ -148,27 +151,30 @@ public void testExecution() throws Exception {
}
}

private static class Source extends UnboundedSource<Long, Source.Checkpoint> {
private static class Source extends BoundedSource<Long> {

private final int count;
private final Instant now = Instant.now();

Source(int count) {
this.count = count;
}

@Override
public List<? extends UnboundedSource<Long, Checkpoint>> split(
int desiredNumSplits, PipelineOptions options) {
public List<? extends BoundedSource<Long>> split(
long desiredBundleSizeBytes, PipelineOptions options) {

return Collections.singletonList(this);
}

@Override
public UnboundedReader<Long> createReader(
PipelineOptions options, @Nullable Checkpoint checkpointMark) {
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return -1;
}

@Override
public BoundedReader<Long> createReader(PipelineOptions options) {

return new UnboundedReader<Long>() {
return new BoundedReader<Long>() {
int pos = -1;

@Override
Expand All @@ -182,19 +188,7 @@ public boolean advance() {
}

@Override
public Instant getWatermark() {
return pos < count
? BoundedWindow.TIMESTAMP_MIN_VALUE
: BoundedWindow.TIMESTAMP_MAX_VALUE;
}

@Override
public CheckpointMark getCheckpointMark() {
return new Checkpoint(pos);
}

@Override
public UnboundedSource<Long, ?> getCurrentSource() {
public BoundedSource<Long> getCurrentSource() {
return Source.this;
}

Expand All @@ -203,41 +197,16 @@ public Long getCurrent() throws NoSuchElementException {
return (long) pos;
}

@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
return now;
}

@Override
public void close() {}
};
}

@Override
public boolean requiresDeduping() {
return false;
}

@Override
public Coder<Long> getOutputCoder() {
// use SerializableCoder to test custom java coders work
return SerializableCoder.of(Long.class);
}

@Override
public Coder<Checkpoint> getCheckpointMarkCoder() {
return SerializableCoder.of(Checkpoint.class);
}

private static class Checkpoint implements CheckpointMark, Serializable {
final int pos;

Checkpoint(int pos) {
this.pos = pos;
}

@Override
public void finalizeCheckpoint() {}
}
}
}

0 comments on commit f9c26e6

Please sign in to comment.