diff --git a/.test-infra/dataproc/flink_cluster.sh b/.test-infra/dataproc/flink_cluster.sh index 5d019a01d235..1a4384b1d0d4 100755 --- a/.test-infra/dataproc/flink_cluster.sh +++ b/.test-infra/dataproc/flink_cluster.sh @@ -35,7 +35,7 @@ # HARNESS_IMAGES_TO_PULL='gcr.io//python:latest gcr.io//java:latest' \ # JOB_SERVER_IMAGE=gcr.io//job-server-flink:latest \ # ARTIFACTS_DIR=gs:// \ -# FLINK_DOWNLOAD_URL=https://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.11.tgz \ +# FLINK_DOWNLOAD_URL=https://archive.apache.org/dist/flink/flink-1.12.3/flink-1.12.3-bin-scala_2.11.tgz \ # HADOOP_DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-9.0/flink-shaded-hadoop-2-uber-2.8.3-9.0.jar \ # FLINK_NUM_WORKERS=2 \ # FLINK_TASKMANAGER_SLOTS=1 \ diff --git a/.test-infra/jenkins/Flink.groovy b/.test-infra/jenkins/Flink.groovy index 53f11fc1b334..40dfa0377175 100644 --- a/.test-infra/jenkins/Flink.groovy +++ b/.test-infra/jenkins/Flink.groovy @@ -17,8 +17,8 @@ */ class Flink { - private static final String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.11.tgz' - private static final String hadoopDownloadUrl = 'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-9.0/flink-shaded-hadoop-2-uber-2.8.3-9.0.jar' + private static final String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.12.3/flink-1.12.3-bin-scala_2.11.tgz' + private static final String hadoopDownloadUrl = 'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar' private static final String FLINK_DIR = '"$WORKSPACE/src/.test-infra/dataproc"' private static final String FLINK_SCRIPT = 'flink_cluster.sh' private def job diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy index 8777788cb433..0143121caaa9 100644 --- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy @@ -105,7 +105,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest" ], initialParallelism, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") // Execute all scenarios connected with initial parallelism. loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, initialScenarios, 'combine', mode) diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy index afe86594f2df..5055d1a6982f 100644 --- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy @@ -132,7 +132,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}" ], initialParallelism, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") // Execute all scenarios connected with initial parallelism. loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, initialScenarios, 'Combine', mode) diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy index 890946eb0a7f..fee2e7d29d7a 100644 --- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy @@ -198,7 +198,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest" ], initialParallelism, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") // Execute all scenarios connected with initial parallelism. loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, initialScenarios, 'group_by_key', mode) diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy index 6fa8df4cb681..1b7c1f9a807d 100644 --- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy @@ -146,7 +146,7 @@ def loadTest = { scope, triggeringContext -> "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") def configurations = testScenarios.findAll { it.pipelineOptions?.parallelism?.value == numberOfWorkers } loadTestsBuilder.loadTests(scope, sdk, configurations, "GBK", "batch") diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy index 17d8d5c8e5e5..567e3276c2b6 100644 --- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy @@ -126,7 +126,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'ParDo', mode) } diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy index ac87b793138a..9b80c61fc726 100644 --- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy @@ -320,7 +320,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'ParDo', mode) } diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy index 60045a7beaa8..c2db4836df33 100644 --- a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy @@ -78,7 +78,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'SideInput', mode) diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy index d5463b0a0fa8..b15c47fe6f18 100644 --- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy @@ -158,7 +158,7 @@ def loadTestJob = { scope, triggeringContext, mode -> "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'CoGBK', mode) } diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy index 5b9a21946049..68dd2113e7a8 100644 --- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy @@ -137,7 +137,7 @@ def loadTest = { scope, triggeringContext -> "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'CoGBK', 'batch') } diff --git a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy index 34d81f2b2ec2..54f8cd949141 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy @@ -38,7 +38,7 @@ def chicagoTaxiJob = { scope -> "${DOCKER_CONTAINER_REGISTRY}/${beamSdkDockerImage}" ], numberOfWorkers, - "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest") + "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.12_job_server:latest") def pipelineOptions = [ parallelism : numberOfWorkers, diff --git a/CHANGES.md b/CHANGES.md index 2b0781e74a91..182f42dedcde 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ * Kafka Beam SQL tables now ascribe meaning to the LOCATION field; previously it was ignored if provided. * `TopCombineFn` disallow `compare` as its argument (Python) ([BEAM-7372](https://issues.apache.org/jira/browse/BEAM-7372)). +* Drop support for Flink 1.10 ([BEAM-12281](https://issues.apache.org/jira/browse/BEAM-12281)). ## Deprecations diff --git a/gradle.properties b/gradle.properties index 953086f0c5f8..1d7255079f64 100644 --- a/gradle.properties +++ b/gradle.properties @@ -32,5 +32,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.10,1.11,1.12,1.13 +flink_versions=1.11,1.12,1.13 diff --git a/release/build.gradle.kts b/release/build.gradle.kts index dc08e50cec91..99ab361ffbb6 100644 --- a/release/build.gradle.kts +++ b/release/build.gradle.kts @@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") { dependsOn(":runners:direct-java:runQuickstartJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow") dependsOn(":runners:spark:2:runQuickstartJavaSpark") - dependsOn(":runners:flink:1.10:runQuickstartJavaFlinkLocal") + dependsOn(":runners:flink:1.13:runQuickstartJavaFlinkLocal") dependsOn(":runners:direct-java:runMobileGamingJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow") dependsOn(":runners:twister2:runQuickstartJavaTwister2") diff --git a/release/src/main/scripts/run_rc_validation.sh b/release/src/main/scripts/run_rc_validation.sh index 1f7b2838efb4..bad8ba5067db 100755 --- a/release/src/main/scripts/run_rc_validation.sh +++ b/release/src/main/scripts/run_rc_validation.sh @@ -199,7 +199,7 @@ if [[ "$java_quickstart_flink_local" = true ]]; then echo "*************************************************************" echo "* Running Java Quickstart with Flink local runner" echo "*************************************************************" - ./gradlew :runners:flink:1.10:runQuickstartJavaFlinkLocal \ + ./gradlew :runners:flink:1.13:runQuickstartJavaFlinkLocal \ -Prepourl=${REPO_URL} \ -Pver=${RELEASE_VER} else diff --git a/runners/flink/1.10/build.gradle b/runners/flink/1.10/build.gradle deleted file mode 100644 index 7c353ecf4ba2..000000000000 --- a/runners/flink/1.10/build.gradle +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '..' -/* All properties required for loading the Flink build script */ -project.ext { - // Set the version of all Flink-related dependencies here. - flink_version = '1.10.1' - // Version specific code overrides. - main_source_overrides = ['./src/main/java'] - test_source_overrides = ['./src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.10' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.10/job-server-container/build.gradle b/runners/flink/1.10/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.10/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.10/job-server/build.gradle b/runners/flink/1.10/job-server/build.gradle deleted file mode 100644 index ee1e8c986970..000000000000 --- a/runners/flink/1.10/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.10-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java deleted file mode 100644 index cd7920d542da..000000000000 --- a/runners/flink/1.10/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; - -/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ -public abstract class AbstractStreamOperatorCompat - extends AbstractStreamOperator { - // timeServiceManager was made private behind a getter in Flink 1.11 - protected InternalTimeServiceManager getTimeServiceManagerCompat() { - return timeServiceManager; - } -} diff --git a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java b/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java deleted file mode 100644 index 0852a13a9d67..000000000000 --- a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTestCompat.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.flink.client.program.OptimizerPlanEnvironment; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.Configuration; - -/** - * Compatibility layer for {@link PackagedProgram} and {@link OptimizerPlanEnvironment} breaking - * changes. - */ -public abstract class FlinkRunnerTestCompat { - public PackagedProgram getPackagedProgram() throws ProgramInvocationException { - return PackagedProgram.newBuilder().setEntryPointClassName(getClass().getName()).build(); - } - - public OptimizerPlanEnvironment getOptimizerPlanEnvironment() { - return new OptimizerPlanEnvironment(new Configuration()); - } - - public void getPipeline(OptimizerPlanEnvironment env, PackagedProgram packagedProgram) - throws ProgramInvocationException { - env.getPipeline(packagedProgram, false); - } -} diff --git a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java b/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java deleted file mode 100644 index 1802983a41e6..000000000000 --- a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import akka.actor.ActorSystem; -import com.typesafe.config.Config; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; - -/** A {@link MiniCluster} which allows remote connections for the end-to-end test. */ -public class RemoteMiniClusterImpl extends RemoteMiniCluster { - - private int port; - - public RemoteMiniClusterImpl(MiniClusterConfiguration miniClusterConfiguration) { - super(miniClusterConfiguration); - } - - @Override - protected RpcService createRpcService( - AkkaRpcServiceConfiguration akkaRpcServiceConfig, boolean remoteEnabled, String bindAddress) { - - // Enable remote connections to the mini cluster which are disabled by default - final Config akkaConfig = - AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration(), bindAddress, 0); - - final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig); - - final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig); - - final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaRpcServiceConfig); - this.port = akkaRpcService.getPort(); - - return akkaRpcService; - } - - @Override - public int getClusterPort() { - Preconditions.checkState(port > 0, "Port not yet initialized. Start the cluster first."); - return port; - } - - @Override - public int getRestPort() { - try { - return getRestAddress().get().getPort(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java b/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java deleted file mode 100644 index f7cc67cd5480..000000000000 --- a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/SourceTransformationCompat.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.SourceTransformation; - -/** Compatibility layer for {@link SourceTransformation} rename. */ -public class SourceTransformationCompat { - public static StreamSource getOperator(Object sourceTransform) { - return ((SourceTransformation) sourceTransform).getOperator(); - } -} diff --git a/runners/flink/1.11/build.gradle b/runners/flink/1.11/build.gradle index 7927adcb2df6..7124e2e6b59d 100644 --- a/runners/flink/1.11/build.gradle +++ b/runners/flink/1.11/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.11.3' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.10/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.10/src/test/java", './src/test/java'] + main_source_overrides = ['./src/main/java'] + test_source_overrides = ['./src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.11' diff --git a/runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java similarity index 100% rename from runners/flink/1.10/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename to runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle index 93ba0c3d5fe2..d38d87376ca6 100644 --- a/runners/flink/1.12/build.gradle +++ b/runners/flink/1.12/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.12.3' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", './src/test/java'] + main_source_overrides = ["${basePath}/1.11/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.11/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.12' diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle index ec865d7c5efb..0484c29322db 100644 --- a/runners/flink/1.13/build.gradle +++ b/runners/flink/1.13/build.gradle @@ -22,8 +22,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.13.0' // Version specific code overrides. - main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/test/java", './src/test/java'] + main_source_overrides = ["${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.11/src/test/java", "${basePath}/1.12/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.13' diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 9fe8ef053f4d..9c8ee14c3480 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -266,7 +266,7 @@ tasks.register('validatesRunner') { dependsOn validatesRunnerStreamingCheckpointing } -// Generates :runners:flink:1.10:runQuickstartJavaFlinkLocal +// Generates :runners:flink:1.13:runQuickstartJavaFlinkLocal createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'FlinkLocal') /** diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index b9115675628b..86220a920a2a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -122,13 +122,8 @@ PortablePipelineResult runPipelineWithTranslator( private PortablePipelineResult createPortablePipelineResult( JobExecutionResult result, PipelineOptions options) { - // The package of DetachedJobExecutionResult has been changed in 1.10. - // Refer to https://github.com/apache/flink/commit/c36b35e6876ecdc717dade653e8554f9d8b543c9 for - // details. String resultClassName = result.getClass().getCanonicalName(); - if (resultClassName.equals( - "org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult") - || resultClassName.equals("org.apache.flink.core.execution.DetachedJobExecutionResult")) { + if (resultClassName.equals("org.apache.flink.core.execution.DetachedJobExecutionResult")) { LOG.info("Pipeline submitted in Detached mode"); // no metricsPusher because metrics are not supported in detached mode return new FlinkPortableRunnerResult.Detached(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 1470a2cbcba9..c4735b8f38d4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -109,13 +109,8 @@ public PipelineResult run(Pipeline pipeline) { } static PipelineResult createPipelineResult(JobExecutionResult result, PipelineOptions options) { - // The package of DetachedJobExecutionResult has been changed in 1.10. - // Refer to https://github.com/apache/flink/commit/c36b35e6876ecdc717dade653e8554f9d8b543c9 for - // more details. String resultClassName = result.getClass().getCanonicalName(); - if (resultClassName.equals( - "org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult") - || resultClassName.equals("org.apache.flink.core.execution.DetachedJobExecutionResult")) { + if (resultClassName.equals("org.apache.flink.core.execution.DetachedJobExecutionResult")) { LOG.info("Pipeline submitted in Detached mode"); // no metricsPusher because metrics are not supported in detached mode return new FlinkDetachedRunnerResult(); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index cdd4865bdd0e..bafa1aa6f21f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -35,7 +35,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; @@ -45,7 +44,6 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.powermock.reflect.Whitebox; -import org.powermock.reflect.exceptions.FieldNotFoundException; /** Tests for {@link FlinkExecutionEnvironments}. */ public class FlinkExecutionEnvironmentsTest { @@ -524,30 +522,18 @@ public void shouldCreateRocksDbStateBackend() { } private void checkHostAndPort(Object env, String expectedHost, int expectedPort) { - try { - assertThat(Whitebox.getInternalState(env, "host"), is(expectedHost)); - assertThat(Whitebox.getInternalState(env, "port"), is(expectedPort)); - } catch (FieldNotFoundException t) { - // for flink 1.10+ - String host = - ((Configuration) Whitebox.getInternalState(env, "configuration")) - .getString(RestOptions.ADDRESS); - int port = - ((Configuration) Whitebox.getInternalState(env, "configuration")) - .getInteger(RestOptions.PORT); - assertThat( - new InetSocketAddress(host, port), is(new InetSocketAddress(expectedHost, expectedPort))); - } + String host = + ((Configuration) Whitebox.getInternalState(env, "configuration")) + .getString(RestOptions.ADDRESS); + int port = + ((Configuration) Whitebox.getInternalState(env, "configuration")) + .getInteger(RestOptions.PORT); + assertThat( + new InetSocketAddress(host, port), is(new InetSocketAddress(expectedHost, expectedPort))); } private String getSavepointPath(Object env) { - try { - return ((SavepointRestoreSettings) Whitebox.getInternalState(env, "savepointRestoreSettings")) - .getRestorePath(); - } catch (FieldNotFoundException t) { - // for flink 1.10+ - return ((Configuration) Whitebox.getInternalState(env, "configuration")) - .getString("execution.savepoint.path", null); - } + return ((Configuration) Whitebox.getInternalState(env, "configuration")) + .getString("execution.savepoint.path", null); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index 0c9fda072c75..8f3bf96e08c6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -72,7 +72,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; -import org.powermock.reflect.exceptions.FieldNotFoundException; /** Tests for {@link FlinkPipelineExecutionEnvironment}. */ @RunWith(JUnit4.class) @@ -431,18 +430,13 @@ private static List convertFilesToURLs(List filePaths) { } private List getJars(Object env) throws Exception { - try { - return (List) Whitebox.getInternalState(env, "jarFiles"); - } catch (FieldNotFoundException t) { - // for flink 1.10+ - Configuration config = Whitebox.getInternalState(env, "configuration"); - Class accesorClass = Class.forName("org.apache.flink.client.cli.ExecutionConfigAccessor"); - Method fromConfigurationMethod = - accesorClass.getDeclaredMethod("fromConfiguration", Configuration.class); - Object accesor = fromConfigurationMethod.invoke(null, config); - - Method getJarsMethod = accesorClass.getDeclaredMethod("getJars"); - return (List) getJarsMethod.invoke(accesor); - } + Configuration config = Whitebox.getInternalState(env, "configuration"); + Class accesorClass = Class.forName("org.apache.flink.client.cli.ExecutionConfigAccessor"); + Method fromConfigurationMethod = + accesorClass.getDeclaredMethod("fromConfiguration", Configuration.class); + Object accesor = fromConfigurationMethod.invoke(null, config); + + Method getJarsMethod = accesorClass.getDeclaredMethod("getJars"); + return (List) getJarsMethod.invoke(accesor); } } diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go index 13a1905269c0..acbad7fd5c0c 100644 --- a/sdks/go/examples/stringsplit/stringsplit.go +++ b/sdks/go/examples/stringsplit/stringsplit.go @@ -20,7 +20,7 @@ // // 1. From a command line, navigate to the top-level beam/ directory and run // the Flink job server: -// ./gradlew :runners:flink:1.10:job-server:runShadow -Djob-host=localhost -Dflink-master=local +// ./gradlew :runners:flink:1.13:job-server:runShadow -Djob-host=localhost -Dflink-master=local // // 2. The job server is ready to receive jobs once it outputs a log like the // following: `JobService started on localhost:8099`. Take note of the endpoint diff --git a/sdks/java/testing/nexmark/build.gradle b/sdks/java/testing/nexmark/build.gradle index 2e8358dd1a8d..c831dbc7dafe 100644 --- a/sdks/java/testing/nexmark/build.gradle +++ b/sdks/java/testing/nexmark/build.gradle @@ -147,7 +147,7 @@ def getNexmarkArgs = { // // Parameters: // -Pnexmark.runner -// Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.10" +// Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.13" // Defaults to ":runners:direct-java" // // -Pnexmark.args diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle index 79fb1e8878bc..a33f24c4f1b6 100644 --- a/sdks/java/testing/tpcds/build.gradle +++ b/sdks/java/testing/tpcds/build.gradle @@ -92,7 +92,7 @@ if (shouldProvideSpark) { // // Parameters: // -Ptpcds.runner -// Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.10" +// Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.13" // Defaults to ":runners:direct-java" // // -Ptpcds.args diff --git a/sdks/python/apache_beam/transforms/sql_test.py b/sdks/python/apache_beam/transforms/sql_test.py index e7a718f28047..9ec484963c8e 100644 --- a/sdks/python/apache_beam/transforms/sql_test.py +++ b/sdks/python/apache_beam/transforms/sql_test.py @@ -57,8 +57,8 @@ class SqlTransformTest(unittest.TestCase): job server. The easiest way to accomplish this is to run the `validatesCrossLanguageRunnerPythonUsingSql` gradle target for a particular job server, which will start the runner and job server for you. For example, - `:runners:flink:1.10:job-server:validatesCrossLanguageRunnerPythonUsingSql` to - test on Flink 1.10. + `:runners:flink:1.13:job-server:validatesCrossLanguageRunnerPythonUsingSql` to + test on Flink 1.13. Alternatively, you may be able to iterate faster if you run the tests directly using a runner like `FlinkRunner`, which can start a local Flink cluster and diff --git a/settings.gradle.kts b/settings.gradle.kts index a6605d95a052..6a8a8f2aad64 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -60,10 +60,6 @@ include(":runners:core-java") include(":runners:direct-java") include(":runners:extensions-java:metrics") /* Begin Flink Runner related settings */ -// Flink 1.10 -include(":runners:flink:1.10") -include(":runners:flink:1.10:job-server") -include(":runners:flink:1.10:job-server-container") // Flink 1.11 include(":runners:flink:1.11") include(":runners:flink:1.11:job-server") diff --git a/website/www/site/content/en/contribute/release-guide.md b/website/www/site/content/en/contribute/release-guide.md index 610522848e51..16ffa03a4aa8 100644 --- a/website/www/site/content/en/contribute/release-guide.md +++ b/website/www/site/content/en/contribute/release-guide.md @@ -882,7 +882,7 @@ _Note_: -Prepourl and -Pver can be found in the RC vote email sent by Release Ma ``` **Flink Local Runner** ``` - ./gradlew :runners:flink:1.10:runQuickstartJavaFlinkLocal \ + ./gradlew :runners:flink:1.13:runQuickstartJavaFlinkLocal \ -Prepourl=https://repository.apache.org/content/repositories/orgapachebeam-${KEY} \ -Pver=${RELEASE_VERSION} ``` diff --git a/website/www/site/content/en/documentation/dsls/sql/shell.md b/website/www/site/content/en/documentation/dsls/sql/shell.md index 5b755fc9ae93..7c7b31bac495 100644 --- a/website/www/site/content/en/documentation/dsls/sql/shell.md +++ b/website/www/site/content/en/documentation/dsls/sql/shell.md @@ -29,7 +29,7 @@ This page describes how to work with the shell, but does not focus on specific f To use Beam SQL shell, you must first clone the [Beam SDK repository](https://github.com/apache/beam). Then, from the root of the repository clone, execute the following commands to run the shell: ``` -./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.10,:sdks:java:io:kafka' installDist +./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' installDist ./sdks/java/extensions/sql/shell/build/install/shell/bin/shell ``` @@ -117,7 +117,7 @@ By default, Beam uses the `DirectRunner` to run the pipeline on the machine wher 1. Make sure the SQL shell includes the desired runner. Add the corresponding project id to the `-Pbeam.sql.shell.bundled` parameter of the Gradle invocation ([source code](https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/shell/build.gradle), [project ids](https://github.com/apache/beam/blob/master/settings.gradle.kts)). For example, use the following command to include Flink runner and KafkaIO: ``` - ./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.10,:sdks:java:io:kafka' installDist + ./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' installDist ``` _Note: You can bundle multiple runners (using a comma-separated list) or other additional components in the same manner. For example, you can add support for more I/Os._ @@ -143,7 +143,7 @@ To configure the runner, you must specify `PipelineOptions` by using the `SET` c You can also build your own standalone package for SQL shell using `distZip` or `distTar` tasks. For example: ``` -./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.10,:sdks:java:io:kafka' distZip +./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' distZip ls ./sdks/java/extensions/sql/shell/build/distributions/ beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index bc242806736a..15610070360c 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -327,7 +327,7 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id - ≥ 2.31.0 + ≥ 2.31.0 1.13.x * beam-runners-flink-1.13 @@ -339,10 +339,6 @@ To find out which version of Flink is compatible with Beam please see the table 1.11.x * beam-runners-flink-1.11 - - 1.10.x - beam-runners-flink-1.10 - 2.30.0 1.12.x * diff --git a/website/www/site/content/en/documentation/sdks/java/testing/nexmark.md b/website/www/site/content/en/documentation/sdks/java/testing/nexmark.md index b37c32880114..2f3690b20f5b 100644 --- a/website/www/site/content/en/documentation/sdks/java/testing/nexmark.md +++ b/website/www/site/content/en/documentation/sdks/java/testing/nexmark.md @@ -147,7 +147,7 @@ When running via Gradle, the following two parameters control the execution: -P nexmark.runner The Gradle project name of the runner, such as ":runners:direct-java" or - ":runners:flink:1.10. The project names can be found in the root + ":runners:flink:1.13. The project names can be found in the root `settings.gradle.kts`. Test data is deterministically synthesized on demand. The test @@ -520,7 +520,7 @@ Streaming Mode: Batch Mode: ./gradlew :sdks:java:testing:nexmark:run \ - -Pnexmark.runner=":runners:flink:1.10" \ + -Pnexmark.runner=":runners:flink:1.13" \ -Pnexmark.args=" --runner=FlinkRunner --suite=SMOKE @@ -533,7 +533,7 @@ Batch Mode: Streaming Mode: ./gradlew :sdks:java:testing:nexmark:run \ - -Pnexmark.runner=":runners:flink:1.10" \ + -Pnexmark.runner=":runners:flink:1.13" \ -Pnexmark.args=" --runner=FlinkRunner --suite=SMOKE