From 05b3fd3b0f54ffeaf91e034af071ac620297b7d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 12 Mar 2021 08:00:24 +0100 Subject: [PATCH] [BEAM-11948] Drop support for Flink 1.8 and 1.9 --- CHANGES.md | 1 + gradle.properties | 2 +- runners/flink/1.10/build.gradle | 4 +- .../beam/runners/flink/FlinkCapabilities.java | 34 ---------- .../AbstractStreamOperatorCompat.java | 0 .../runners/flink/RemoteMiniClusterImpl.java | 0 .../flink/SourceTransformationCompat.java | 0 runners/flink/1.11/build.gradle | 4 +- runners/flink/1.12/build.gradle | 4 +- runners/flink/1.8/build.gradle | 34 ---------- .../1.8/job-server-container/build.gradle | 26 -------- runners/flink/1.8/job-server/build.gradle | 31 ---------- .../beam/runners/flink/FlinkCapabilities.java | 34 ---------- .../streaming/io/BeamStoppableFunction.java | 29 --------- .../runners/flink/FlinkRunnerTestCompat.java | 43 ------------- .../flink/streaming/StreamSources.java | 50 --------------- runners/flink/1.9/build.gradle | 33 ---------- .../1.9/job-server-container/build.gradle | 26 -------- runners/flink/1.9/job-server/build.gradle | 31 ---------- .../flink/streaming/StreamSources.java | 62 ------------------- .../flink/FlinkBatchTransformTranslators.java | 10 +-- .../runners/flink/FlinkPipelineRunner.java | 8 --- .../beam/runners/flink/FlinkRunner.java | 7 --- .../functions/FlinkDoFnFunction.java | 12 +--- .../types/CoderTypeSerializer.java | 0 .../types/EncodedValueSerializer.java | 0 .../streaming/io/BeamStoppableFunction.java | 0 .../flink/batch/NonMergingGroupByKeyTest.java | 5 -- .../types/CoderTypeSerializerTest.java | 0 settings.gradle.kts | 8 --- .../content/en/documentation/runners/flink.md | 25 ++++++-- 31 files changed, 29 insertions(+), 494 deletions(-) delete mode 100644 runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java rename runners/flink/{1.8 => 1.10}/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java (100%) rename runners/flink/{1.8 => 1.10}/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java (100%) rename runners/flink/{1.8 => 1.10}/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java (100%) delete mode 100644 runners/flink/1.8/build.gradle delete mode 100644 runners/flink/1.8/job-server-container/build.gradle delete mode 100644 runners/flink/1.8/job-server/build.gradle delete mode 100644 runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java delete mode 100644 runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java delete mode 100644 runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java delete mode 100644 runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java delete mode 100644 runners/flink/1.9/build.gradle delete mode 100644 runners/flink/1.9/job-server-container/build.gradle delete mode 100644 runners/flink/1.9/job-server/build.gradle delete mode 100644 runners/flink/1.9/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename runners/flink/{1.8 => }/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java (100%) rename runners/flink/{1.8 => }/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java (100%) rename runners/flink/{1.9 => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java (100%) rename runners/flink/{1.8 => }/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java (100%) diff --git a/CHANGES.md b/CHANGES.md index 483518f0e785..8d7740e444e3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,7 @@ ## Breaking Changes * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Drop support for Flink 1.8 and 1.9 ([BEAM-11948](https://issues.apache.org/jira/browse/BEAM-11948)). ## Deprecations diff --git a/gradle.properties b/gradle.properties index 364445a94b96..4255ef8c27fd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -32,5 +32,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.8,1.9,1.10,1.11,1.12 +flink_versions=1.10,1.11,1.12 diff --git a/runners/flink/1.10/build.gradle b/runners/flink/1.10/build.gradle index 5a79c3e3ecb9..7c353ecf4ba2 100644 --- a/runners/flink/1.10/build.gradle +++ b/runners/flink/1.10/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.10.1' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.8/src/main/java", "${basePath}/1.9/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.8/src/test/java", "${basePath}/1.9/src/test/java", './src/test/java'] + main_source_overrides = ['./src/main/java'] + test_source_overrides = ['./src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.10' diff --git a/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java b/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java deleted file mode 100644 index 1b56c72946a9..000000000000 --- a/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -/** Handle different capabilities between flink versions. */ -public class FlinkCapabilities { - - /** - * Support for outputting elements in close method of chained drivers. - * - *

{@see FLINK-14709} for more - * details. - * - * @return True if feature is supported. - */ - public static boolean supportsOutputDuringClosing() { - return true; - } -} diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java similarity index 100% rename from runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java rename to runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java diff --git a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java b/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java similarity index 100% rename from runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java rename to runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java diff --git a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java b/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java similarity index 100% rename from runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java rename to runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java diff --git a/runners/flink/1.11/build.gradle b/runners/flink/1.11/build.gradle index 229d56917cef..7927adcb2df6 100644 --- a/runners/flink/1.11/build.gradle +++ b/runners/flink/1.11/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.11.3' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.8/src/main/java", "${basePath}/1.9/src/main/java", "${basePath}/1.10/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.8/src/test/java", "${basePath}/1.9/src/test/java", "${basePath}/1.10/src/test/java", './src/test/java'] + main_source_overrides = ["${basePath}/1.10/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.10/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.11' diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle index fbf22058b6d3..6db1ccb22517 100644 --- a/runners/flink/1.12/build.gradle +++ b/runners/flink/1.12/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.12.2' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.8/src/main/java", "${basePath}/1.9/src/main/java", "${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.8/src/test/java", "${basePath}/1.9/src/test/java", "${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", './src/test/java'] + main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.12' diff --git a/runners/flink/1.8/build.gradle b/runners/flink/1.8/build.gradle deleted file mode 100644 index 5489aec11194..000000000000 --- a/runners/flink/1.8/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '..' - -/* All properties required for loading the Flink build script */ -project.ext { - // Set the version of all Flink-related dependencies here. - flink_version = '1.8.3' - // Version specific code overrides. - main_source_overrides = ['./src/main/java'] - test_source_overrides = ['./src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.8' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.8/job-server-container/build.gradle b/runners/flink/1.8/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.8/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.8/job-server/build.gradle b/runners/flink/1.8/job-server/build.gradle deleted file mode 100644 index 562b6ca6480d..000000000000 --- a/runners/flink/1.8/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.8-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java deleted file mode 100644 index e1d2a44f9597..000000000000 --- a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/FlinkCapabilities.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -/** Handle different capabilities between flink versions. */ -public class FlinkCapabilities { - - /** - * Support for outputting elements in close method of chained drivers. - * - *

{@see FLINK-14709} for more - * details. - * - * @return True if feature is supported. - */ - public static boolean supportsOutputDuringClosing() { - return false; - } -} diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java deleted file mode 100644 index 25eafd7ac114..000000000000 --- a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import org.apache.flink.api.common.functions.StoppableFunction; - -/** - * Custom StoppableFunction for backward compatibility. - * - * @see Flink - * interface removal commit. - */ -public interface BeamStoppableFunction extends StoppableFunction {} diff --git a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java b/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java deleted file mode 100644 index 0dc208a18434..000000000000 --- a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.flink.client.program.OptimizerPlanEnvironment; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.Optimizer; - -/** - * Compatibility layer for {@link PackagedProgram} and {@link OptimizerPlanEnvironment} breaking - * changes. - */ -public abstract class FlinkRunnerTestCompat { - public PackagedProgram getPackagedProgram() throws ProgramInvocationException { - return new PackagedProgram(getClass()); - } - - public OptimizerPlanEnvironment getOptimizerPlanEnvironment() { - return new OptimizerPlanEnvironment(new Optimizer(new Configuration())); - } - - public void getPipeline(OptimizerPlanEnvironment env, PackagedProgram packagedProgram) - throws ProgramInvocationException { - env.getOptimizedPlan(packagedProgram); - } -} diff --git a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java deleted file mode 100644 index c148c3fc1c06..000000000000 --- a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; - -/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ -public class StreamSources { - - /** - * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has - * been removed in Flink 1.12. - * - * @param source Source to get single input from. - * @return Input transformation. - */ - public static StreamTransformation getOnlyInput(OneInputTransformation source) { - return source.getInput(); - } - - public static > void run( - StreamSource streamSource, - Object lockingObject, - StreamStatusMaintainer streamStatusMaintainer, - Output> collector) - throws Exception { - streamSource.run(lockingObject, streamStatusMaintainer, collector); - } -} diff --git a/runners/flink/1.9/build.gradle b/runners/flink/1.9/build.gradle deleted file mode 100644 index 0c2c01253e78..000000000000 --- a/runners/flink/1.9/build.gradle +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '..' -/* All properties required for loading the Flink build script */ -project.ext { - // Set the version of all Flink-related dependencies here. - flink_version = '1.9.3' - // Version specific code overrides. - main_source_overrides = ["${basePath}/1.8/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.8/src/test/java", './src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.9' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.9/job-server-container/build.gradle b/runners/flink/1.9/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.9/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.9/job-server/build.gradle b/runners/flink/1.9/job-server/build.gradle deleted file mode 100644 index b094ddac437c..000000000000 --- a/runners/flink/1.9/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.9-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.9/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.9/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java deleted file mode 100644 index bfbd89590958..000000000000 --- a/runners/flink/1.9/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.flink.api.dag.Transformation; -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ -public class StreamSources { - - /** - * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has - * been removed in Flink 1.12. - * - * @param source Source to get single input from. - * @return Input transformation. - */ - public static Transformation getOnlyInput(OneInputTransformation source) { - return source.getInput(); - } - - public static > void run( - StreamSource streamSource, - Object lockingObject, - StreamStatusMaintainer streamStatusMaintainer, - Output> collector) - throws Exception { - streamSource.run( - lockingObject, streamStatusMaintainer, collector, createOperatorChain(streamSource)); - } - - private static OperatorChain createOperatorChain(AbstractStreamOperator operator) { - return new OperatorChain<>( - operator.getContainingTask(), - StreamTask.createRecordWriters( - operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); - } -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 517aada383f4..c2826e0dbd14 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -95,7 +95,6 @@ import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.Grouping; import org.apache.flink.api.java.operators.MapOperator; -import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.SingleInputUdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.configuration.Configuration; @@ -772,14 +771,7 @@ public void translateNode( doFnSchemaInformation, sideInputMapping); - if (FlinkCapabilities.supportsOutputDuringClosing()) { - outputDataSet = - new FlatMapOperator<>(inputDataSet, typeInformation, doFnWrapper, fullName); - } else { - // This can be removed once we drop support for 1.8 and 1.9 versions. - outputDataSet = - new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, fullName); - } + outputDataSet = new FlatMapOperator<>(inputDataSet, typeInformation, doFnWrapper, fullName); } transformSideInputs(sideInputs, outputDataSet, context); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index 8de4a27c91d5..96b8781cc035 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -46,7 +46,6 @@ import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.runtime.util.EnvironmentInformation; import org.checkerframework.checker.nullness.qual.Nullable; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; @@ -76,13 +75,6 @@ public FlinkPipelineRunner( public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) throws Exception { MetricsEnvironment.setMetricsSupported(false); - final String flinkVersion = EnvironmentInformation.getVersion(); - if (flinkVersion.startsWith("1.8") || flinkVersion.startsWith("1.9")) { - LOG.warn( - "You are running Flink {}. Support for Flink 1.8.x and 1.9.x will be removed from Beam in version 2.30.0. Please consider upgrading to a more recent Flink version.", - flinkVersion); - } - FlinkPortablePipelineTranslator translator; if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) { // TODO: Do we need to inspect for unbounded sources before fusing? diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index c1180e887414..f63869123ec6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -37,7 +37,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.util.EnvironmentInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,12 +79,6 @@ public PipelineResult run(Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); - final String flinkVersion = EnvironmentInformation.getVersion(); - if (flinkVersion.startsWith("1.8") || flinkVersion.startsWith("1.9")) { - LOG.warn( - "You are running Flink {}. Support for Flink 1.8.x and 1.9.x will be removed from Beam in version 2.30.0. Please consider upgrading to a more recent Flink version.", - flinkVersion); - } LOG.info("Executing pipeline using FlinkRunner."); if (!options.getFasterCopy()) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 8a9b8d8497fd..1ec4620a6203 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -43,7 +43,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; @@ -61,8 +60,7 @@ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class FlinkDoFnFunction extends AbstractRichFunction - implements FlatMapFunction, WindowedValue>, - MapPartitionFunction, WindowedValue> { + implements FlatMapFunction, WindowedValue> { private final SerializablePipelineOptions serializedOptions; @@ -128,14 +126,6 @@ public void flatMap(WindowedValue value, Collector> values, Collector> out) { - for (WindowedValue value : values) { - flatMap(value, out); - } - } - @Override public void open(Configuration parameters) { // Note that the SerializablePipelineOptions already initialize FileSystems in the readObject() diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 100% rename from runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java similarity index 100% rename from runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java diff --git a/runners/flink/1.9/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java similarity index 100% rename from runners/flink/1.9/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java index 935ca21e6bf0..a63480e78800 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java @@ -19,7 +19,6 @@ import java.util.Arrays; import java.util.Objects; -import org.apache.beam.runners.flink.FlinkCapabilities; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.FlinkTestPipeline; import org.apache.beam.sdk.Pipeline; @@ -31,7 +30,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.flink.test.util.AbstractTestBase; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; public class NonMergingGroupByKeyTest extends AbstractTestBase { @@ -48,9 +46,6 @@ public void processElement(@Element KV> el) { @Test public void testDisabledReIterationThrowsAnException() { - // If output during closing is not supported, we can not chain DoFns and results - // are therefore materialized during output serialization. - Assume.assumeTrue(FlinkCapabilities.supportsOutputDuringClosing()); final Pipeline p = FlinkTestPipeline.createForBatch(); p.apply(Create.of(Arrays.asList(KV.of("a", 1), KV.of("b", 2), KV.of("c", 3)))) .apply(GroupByKey.create()) diff --git a/runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java similarity index 100% rename from runners/flink/1.8/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java diff --git a/settings.gradle.kts b/settings.gradle.kts index 7444fcfe5256..14619a4304a4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -60,14 +60,6 @@ include(":runners:core-java") include(":runners:direct-java") include(":runners:extensions-java:metrics") /* Begin Flink Runner related settings */ -// Flink 1.8 -include(":runners:flink:1.8") -include(":runners:flink:1.8:job-server") -include(":runners:flink:1.8:job-server-container") -// Flink 1.9 -include(":runners:flink:1.9") -include(":runners:flink:1.9:job-server") -include(":runners:flink:1.9:job-server-container") // Flink 1.10 include(":runners:flink:1.10") include(":runners:flink:1.10:job-server") diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index e1f814c19177..4b5e5a445408 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -196,9 +196,9 @@ The optional `flink_version` option may be required as well for older versions o {{< paragraph class="language-portable" >}} Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: -[Flink 1.8](https://hub.docker.com/r/apache/beam_flink1.8_job_server), -[Flink 1.9](https://hub.docker.com/r/apache/beam_flink1.9_job_server). -[Flink 1.10](https://hub.docker.com/r/apache/beam_flink1.10_job_server). +[Flink 1.10](https://hub.docker.com/r/apache/beam_flink1.10_job_server), +[Flink 1.11](https://hub.docker.com/r/apache/beam_flink1.11_job_server), +[Flink 1.12](https://hub.docker.com/r/apache/beam_flink1.12_job_server). {{< /paragraph >}} @@ -311,8 +311,8 @@ reference. ## Flink Version Compatibility The Flink cluster version has to match the minor version used by the FlinkRunner. -The minor version is the first two numbers in the version string, e.g. in `1.8.0` the -minor version is `1.8`. +The minor version is the first two numbers in the version string, e.g. in `1.12.0` the +minor version is `1.12`. We try to track the latest version of Apache Flink at the time of the Beam release. A Flink version is supported by Beam for the time it is supported by the Flink community. @@ -326,7 +326,20 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id - ≥ 2.27.0 + ≥ 2.30.0 + 1.12.x * + beam-runners-flink-1.12 + + + 1.11.x * + beam-runners-flink-1.11 + + + 1.10.x + beam-runners-flink-1.10 + + + 2.27.0 - 2.29.0 1.12.x * beam-runners-flink-1.12