From 12530a38ca1fd902371304d3b2ca2dd6eb9e58b0 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 21 Mar 2022 14:47:04 -0700 Subject: [PATCH] [BEAM-14139] Drop support for Flink 1.11. --- gradle.properties | 2 +- runners/flink/1.11/build.gradle | 33 ------- .../1.11/job-server-container/build.gradle | 26 ------ runners/flink/1.11/job-server/build.gradle | 31 ------- .../AbstractStreamOperatorCompat.java | 61 ------------ .../runners/flink/FlinkRunnerTestCompat.java | 47 ---------- .../flink/SourceTransformationCompat.java | 28 ------ runners/flink/1.12/build.gradle | 4 +- .../runners/flink/RemoteMiniClusterImpl.java | 0 .../flink/metrics/MetricGroupWrapper.java | 0 .../flink/streaming/StreamSources.java | 22 +---- runners/flink/1.13/build.gradle | 4 +- .../flink/streaming/StreamSources.java | 92 ------------------- runners/flink/1.14/build.gradle | 4 +- .../flink/streaming/StreamSources.java | 22 +---- .../beam/runners/flink/FlinkRunnerTest.java | 41 +++++---- ...linkStreamingTransformTranslatorsTest.java | 13 ++- settings.gradle.kts | 4 - 18 files changed, 41 insertions(+), 393 deletions(-) delete mode 100644 runners/flink/1.11/build.gradle delete mode 100644 runners/flink/1.11/job-server-container/build.gradle delete mode 100644 runners/flink/1.11/job-server/build.gradle delete mode 100644 runners/flink/1.11/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java delete mode 100644 runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java delete mode 100644 runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java rename runners/flink/{1.11 => 1.12}/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java (100%) rename runners/flink/{1.11 => 1.12}/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java (100%) delete mode 100644 runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/gradle.properties b/gradle.properties index 1657c380bd2e..d84b4e4e2bee 100644 --- a/gradle.properties +++ b/gradle.properties @@ -37,5 +37,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.11,1.12,1.13,1.14 +flink_versions=1.12,1.13,1.14 diff --git a/runners/flink/1.11/build.gradle b/runners/flink/1.11/build.gradle deleted file mode 100644 index 81cc0a0c7408..000000000000 --- a/runners/flink/1.11/build.gradle +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '..' -/* All properties required for loading the Flink build script */ -project.ext { - // Set the version of all Flink-related dependencies here. - flink_version = '1.11.6' - // Version specific code overrides. - main_source_overrides = ['./src/main/java'] - test_source_overrides = ['./src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.11' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.11/job-server-container/build.gradle b/runners/flink/1.11/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.11/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.11/job-server/build.gradle b/runners/flink/1.11/job-server/build.gradle deleted file mode 100644 index 00801231aaaf..000000000000 --- a/runners/flink/1.11/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.11-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.11/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.11/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java deleted file mode 100644 index d1bcff753a42..000000000000 --- a/runners/flink/1.11/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; - -/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ -public abstract class AbstractStreamOperatorCompat - extends AbstractStreamOperator { - // timeServiceManager was made private behind a getter in Flink 1.11 - protected InternalTimeServiceManager getTimeServiceManagerCompat() { - return getTimeServiceManager().get(); - } - - /** Release all of the operator's resources. */ - abstract void cleanUp() throws Exception; - - /** Flush all remaining buffered data. */ - abstract void flushData() throws Exception; - - // Prior to Flink 1.14, dispose() releases the operator's resources, while close() flushes - // remaining data and then releases the operator's resources. - // https://issues.apache.org/jira/browse/FLINK-22972 - - @Override - public void dispose() throws Exception { - try { - cleanUp(); - } finally { - // This releases all task's resources. We need to call this last - // to ensure that state, timers, or output buffers can still be - // accessed during finishing the bundle. - super.dispose(); - } - } - - @Override - public void close() throws Exception { - try { - flushData(); - } finally { - super.close(); - } - } -} diff --git a/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java b/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java deleted file mode 100644 index b49f323a60e5..000000000000 --- a/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.flink.client.program.OptimizerPlanEnvironment; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.PackagedProgramUtils; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.Configuration; - -/** - * Compatibility layer for {@link PackagedProgram} and {@link OptimizerPlanEnvironment} breaking - * changes. - */ -public abstract class FlinkRunnerTestCompat { - public PackagedProgram getPackagedProgram() throws ProgramInvocationException { - return PackagedProgram.newBuilder().setEntryPointClassName(getClass().getName()).build(); - } - - public OptimizerPlanEnvironment getOptimizerPlanEnvironment() { - int parallelism = Runtime.getRuntime().availableProcessors(); - return new OptimizerPlanEnvironment( - new Configuration(), getClass().getClassLoader(), parallelism); - } - - public void getPipeline(OptimizerPlanEnvironment env, PackagedProgram packagedProgram) - throws ProgramInvocationException { - int parallelism = Runtime.getRuntime().availableProcessors(); - PackagedProgramUtils.getPipelineFromProgram( - packagedProgram, env.getConfiguration(), parallelism, true); - } -} diff --git a/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java b/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java deleted file mode 100644 index 43fa95b6acef..000000000000 --- a/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; - -/** Compatibility layer for {@link LegacySourceTransformation} rename. */ -public class SourceTransformationCompat { - public static StreamSource getOperator(Object sourceTransform) { - return ((LegacySourceTransformation) sourceTransform).getOperator(); - } -} diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle index ed62e0f658e8..96ecec453b0b 100644 --- a/runners/flink/1.12/build.gradle +++ b/runners/flink/1.12/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.12.7' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.11/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.11/src/test/java", './src/test/java'] + main_source_overrides = ['./src/main/java'] + test_source_overrides = ['./src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.12' diff --git a/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java similarity index 100% rename from runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java rename to runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java diff --git a/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java similarity index 100% rename from runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java rename to runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java index 38df4adf7a5e..ba5940653cd6 100644 --- a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -17,40 +17,20 @@ */ package org.apache.beam.runners.flink.streaming; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.flink.api.dag.Transformation; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.streaming.runtime.tasks.StreamTask; -/** - * {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. - * - *

This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer - * to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more - * details. - */ +/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ public class StreamSources { - /** - * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has - * been removed in Flink 1.12. - * - * @param source Source to get single input from. - * @return Input transformation. - */ - public static Transformation getOnlyInput(OneInputTransformation source) { - return Iterables.getOnlyElement(source.getInputs()); - } - public static > void run( StreamSource streamSource, Object lockingObject, diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle index d29d2f9059fd..4045df69e2bb 100644 --- a/runners/flink/1.13/build.gradle +++ b/runners/flink/1.13/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.13.5' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.11/src/test/java", "${basePath}/1.12/src/test/java", './src/test/java'] + main_source_overrides = ["${basePath}/1.12/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.12/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.13' diff --git a/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java deleted file mode 100644 index 38df4adf7a5e..000000000000 --- a/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.flink.api.dag.Transformation; -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -/** - * {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. - * - *

This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer - * to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more - * details. - */ -public class StreamSources { - - /** - * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has - * been removed in Flink 1.12. - * - * @param source Source to get single input from. - * @return Input transformation. - */ - public static Transformation getOnlyInput(OneInputTransformation source) { - return Iterables.getOnlyElement(source.getInputs()); - } - - public static > void run( - StreamSource streamSource, - Object lockingObject, - Output> collector) - throws Exception { - streamSource.run( - lockingObject, - new TestStreamStatusMaintainer(), - collector, - createOperatorChain(streamSource)); - } - - private static OperatorChain createOperatorChain(AbstractStreamOperator operator) { - return new OperatorChain<>( - operator.getContainingTask(), - StreamTask.createRecordWriterDelegate( - operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); - } - - /** StreamStatusMaintainer was removed in Flink 1.14. */ - private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer { - StreamStatus currentStreamStatus = StreamStatus.ACTIVE; - - @Override - public void toggleStreamStatus(StreamStatus streamStatus) { - if (!currentStreamStatus.equals(streamStatus)) { - currentStreamStatus = streamStatus; - } - } - - @Override - public StreamStatus getStreamStatus() { - return currentStreamStatus; - } - } - - /** The emitWatermarkStatus method was added in Flink 1.14, so we need to wrap Output. */ - public interface OutputWrapper extends Output {} -} diff --git a/runners/flink/1.14/build.gradle b/runners/flink/1.14/build.gradle index 0281f87d33d6..30cb130249c6 100644 --- a/runners/flink/1.14/build.gradle +++ b/runners/flink/1.14/build.gradle @@ -23,8 +23,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.14.3' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.11/src/test/java", "${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", './src/test/java'] + main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.14' diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java index 0f198986c563..d1e38549d8ed 100644 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -17,40 +17,20 @@ */ package org.apache.beam.runners.flink.streaming; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.flink.api.dag.Transformation; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -/** - * {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. - * - *

This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer - * to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more - * details. - */ +/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ public class StreamSources { - /** - * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has - * been removed in Flink 1.12. - * - * @param source Source to get single input from. - * @return Input transformation. - */ - public static Transformation getOnlyInput(OneInputTransformation source) { - return Iterables.getOnlyElement(source.getInputs()); - } - public static > void run( StreamSource streamSource, Object lockingObject, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java index c02a43275757..379d6ee01510 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink; import static org.hamcrest.CoreMatchers.allOf; +import static org.junit.Assert.assertThrows; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -25,32 +26,38 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.flink.client.program.OptimizerPlanEnvironment; import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; import org.hamcrest.MatcherAssert; import org.hamcrest.core.StringContains; -import org.junit.Assert; import org.junit.Test; /** Test for {@link FlinkRunner}. */ -public class FlinkRunnerTest extends FlinkRunnerTestCompat { +public class FlinkRunnerTest { @Test public void testEnsureStdoutStdErrIsRestored() throws Exception { - PackagedProgram packagedProgram = getPackagedProgram(); - OptimizerPlanEnvironment env = getOptimizerPlanEnvironment(); - try { - // Flink will throw an error because no job graph will be generated by the main method - getPipeline(env, packagedProgram); - Assert.fail("This should have failed to create the Flink Plan."); - } catch (ProgramInvocationException e) { - // Test that Flink wasn't able to intercept the stdout/stderr and we printed to the regular - // output instead - MatcherAssert.assertThat( - e.getMessage(), - allOf( - StringContains.containsString("System.out: (none)"), - StringContains.containsString("System.err: (none)"))); - } + PackagedProgram packagedProgram = + PackagedProgram.newBuilder().setEntryPointClassName(getClass().getName()).build(); + int parallelism = Runtime.getRuntime().availableProcessors(); + OptimizerPlanEnvironment env = + new OptimizerPlanEnvironment(new Configuration(), getClass().getClassLoader(), parallelism); + Exception e = + assertThrows( + ProgramInvocationException.class, + () -> { + // Flink will throw an error because no job graph will be generated by the main method + PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, env.getConfiguration(), parallelism, true); + }); + // Test that Flink wasn't able to intercept the stdout/stderr and we printed to the regular + // output instead + MatcherAssert.assertThat( + e.getMessage(), + allOf( + StringContains.containsString("System.out: (none)"), + StringContains.containsString("System.err: (none)"))); } /** Main method for {@code testEnsureStdoutStdErrIsRestored()}. */ diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 2cf6d6d13fe4..87de3414790d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -30,7 +30,6 @@ import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId; -import org.apache.beam.runners.flink.streaming.StreamSources; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; @@ -48,7 +47,9 @@ import org.apache.beam.sdk.values.PValues; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; @@ -77,7 +78,7 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { UnboundedSourceWrapperNoValueWithRecordId source = (UnboundedSourceWrapperNoValueWithRecordId) - SourceTransformationCompat.getOperator(sourceTransform).getUserFunction(); + ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); assertEquals(maxParallelism, source.getUnderlyingSource().getSplitSources().size()); } @@ -97,7 +98,7 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { UnboundedSourceWrapperNoValueWithRecordId source = (UnboundedSourceWrapperNoValueWithRecordId) - SourceTransformationCompat.getOperator(sourceTransform).getUserFunction(); + ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); assertEquals(parallelism, source.getUnderlyingSource().getSplitSources().size()); } @@ -120,7 +121,8 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { UnboundedSourceWrapper source = (UnboundedSourceWrapper) - SourceTransformationCompat.getOperator(StreamSources.getOnlyInput(sourceTransform)) + ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getOperator() .getUserFunction(); assertEquals(maxParallelism, source.getSplitSources().size()); @@ -142,7 +144,8 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { UnboundedSourceWrapper source = (UnboundedSourceWrapper) - SourceTransformationCompat.getOperator(StreamSources.getOnlyInput(sourceTransform)) + ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getOperator() .getUserFunction(); assertEquals(parallelism, source.getSplitSources().size()); diff --git a/settings.gradle.kts b/settings.gradle.kts index 55a0c85ba914..fd3f1b221483 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -72,10 +72,6 @@ include(":runners:core-java") include(":runners:direct-java") include(":runners:extensions-java:metrics") /* Begin Flink Runner related settings */ -// Flink 1.11 -include(":runners:flink:1.11") -include(":runners:flink:1.11:job-server") -include(":runners:flink:1.11:job-server-container") // Flink 1.12 include(":runners:flink:1.12") include(":runners:flink:1.12:job-server")