From 8fdf81345ebef2149e73f4e6af8fcb792aaf4150 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Fri, 24 May 2024 15:49:34 +0200 Subject: [PATCH] [runners-flink] Drop Flnk 1.14 and clean up code (#31112) --- .../test-properties.json | 4 +- gradle.properties | 2 +- .../runner-concepts/description.md | 2 +- runners/flink/1.14/build.gradle | 25 ----- .../1.14/job-server-container/build.gradle | 26 ------ runners/flink/1.14/job-server/build.gradle | 31 ------- .../AbstractStreamOperatorCompat.java | 91 ------------------- .../ProcessingTimeCallbackCompat.java | 22 ----- .../io/source/compat/FlinkSourceCompat.java | 28 ------ .../source/compat/SplitEnumeratorCompat.java | 27 ------ .../io/source/compat/package-info.java | 20 ---- .../beam/runners/flink/MiniClusterCompat.java | 29 ------ .../flink/metrics/MetricGroupWrapper.java | 31 ------- .../types/CoderTypeSerializer.java | 0 .../ProcessingTimeCallbackCompat.java | 22 ----- .../beam/runners/flink/MiniClusterCompat.java | 30 ------ .../FlinkStreamingTransformTranslators.java | 8 +- .../wrappers/streaming/DoFnOperator.java | 59 ++++++++++-- .../streaming/io/UnboundedSourceWrapper.java | 8 +- .../io/source/FlinkSourceReaderBase.java | 3 +- .../io/source/FlinkSourceSplitEnumerator.java | 9 +- .../runners/flink/FlinkSavepointTest.java | 6 +- .../runners/flink/RemoteMiniClusterImpl.java | 0 .../metrics/FlinkMetricContainerTest.java | 3 +- .../flink/streaming/StreamSources.java | 0 .../types/CoderTypeSerializerTest.java | 0 .../io/UnboundedSourceWrapperTest.java | 4 +- .../io/source/FlinkSourceReaderTestBase.java | 20 ++-- .../io/source/SourceTestMetrics.java} | 2 +- .../bounded/FlinkBoundedSourceReaderTest.java | 2 +- .../FlinkUnboundedSourceReaderTest.java | 2 +- sdks/go/examples/wasm/README.md | 2 +- .../apache_beam/options/pipeline_options.py | 2 +- .../src/apache_beam/runners/flink.ts | 2 +- settings.gradle.kts | 12 ++- .../content/en/documentation/runners/flink.md | 31 ++++++- .../flink_java_pipeline_options.html | 15 +++ .../flink_python_pipeline_options.html | 15 +++ 38 files changed, 157 insertions(+), 438 deletions(-) delete mode 100644 runners/flink/1.14/build.gradle delete mode 100644 runners/flink/1.14/job-server-container/build.gradle delete mode 100644 runners/flink/1.14/job-server/build.gradle delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java delete mode 100644 runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java delete mode 100644 runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java rename runners/flink/{1.14 => 1.15}/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java (100%) delete mode 100644 runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java delete mode 100644 runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java rename runners/flink/{1.14 => }/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java (100%) rename runners/flink/{1.14 => }/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java (100%) rename runners/flink/{1.14 => }/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java (100%) rename runners/flink/{1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java => src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java} (99%) diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index f2e1dce9922d..84c7691077e2 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -13,8 +13,8 @@ "TOX_ENV": ["Cloud", "Cython"] }, "JavaTestProperties": { - "SUPPORTED_VERSIONS": ["8", "11", "17"], - "FLINK_VERSIONS": ["1.13", "1.14", "1.15"], + "SUPPORTED_VERSIONS": ["8", "11", "17", "21"], + "FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18"], "SPARK_VERSIONS": ["2", "3"] }, "GoTestProperties": { diff --git a/gradle.properties b/gradle.properties index d34b4dd7b08f..59c022609a55 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.14,1.15,1.16,1.17,1.18 +flink_versions=1.15,1.16,1.17,1.18 # supported python versions python_versions=3.8,3.9,3.10,3.11 diff --git a/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md b/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md index cd22a1f7c079..9a0a53f9b49c 100644 --- a/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md +++ b/learning/tour-of-beam/learning-content/java/introduction/introduction-concepts/runner-concepts/description.md @@ -111,7 +111,7 @@ Additionally, you can read [here](https://beam.apache.org/documentation/runners/ #### Run example ##### Portable -1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.10`, `Flink 1.11`, `Flink 1.12`, `Flink 1.13`, `Flink 1.14`. +1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.15`, `Flink 1.16`, `Flink 1.17`, `Flink 1.18`. 2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.10_job_server:latest` 3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example: diff --git a/runners/flink/1.14/build.gradle b/runners/flink/1.14/build.gradle deleted file mode 100644 index cfbd8f8dde79..000000000000 --- a/runners/flink/1.14/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.14' - flink_version = '1.14.3' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.14/job-server-container/build.gradle b/runners/flink/1.14/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.14/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.14/job-server/build.gradle b/runners/flink/1.14/job-server/build.gradle deleted file mode 100644 index 58f46bb7709c..000000000000 --- a/runners/flink/1.14/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.14-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java deleted file mode 100644 index d8740964fda9..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; -import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; - -/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ -public abstract class AbstractStreamOperatorCompat - extends AbstractStreamOperator { - - /** - * Getter for timeServiceManager, which has been made private in Flink 1.11. - * - * @return Time service manager. - */ - protected InternalTimeServiceManager getTimeServiceManagerCompat() { - return getTimeServiceManager() - .orElseThrow(() -> new IllegalStateException("Time service manager is not set.")); - } - - /** - * This call has been removed from {@link AbstractStreamOperator} in Flink 1.12. - * - *

{@link InternalTimeServiceManagerImpl#numProcessingTimeTimers()} - */ - protected int numProcessingTimeTimers() { - return getTimeServiceManager() - .map( - manager -> { - InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); - if (tsm instanceof InternalTimeServiceManagerImpl) { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); - } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { - return 0; - } else { - throw new IllegalStateException( - String.format( - "Unknown implementation of InternalTimerServiceManager. %s", tsm)); - } - }) - .orElse(0); - } - - /** Release all of the operator's resources. */ - abstract void cleanUp() throws Exception; - - /** Flush all remaining buffered data. */ - abstract void flushData() throws Exception; - - // Start with Flink 1.14, dispose() has been removed. finish() flushes remaining data, while - // close() no longer flushes data, close() now only releases the operator's resources. - // https://issues.apache.org/jira/browse/FLINK-22972 - - @Override - public void finish() throws Exception { - try { - flushData(); - } finally { - super.finish(); - } - } - - @Override - public void close() throws Exception { - try { - cleanUp(); - } finally { - super.close(); - } - } -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java deleted file mode 100644 index a494fec01dde..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; - -public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java deleted file mode 100644 index f68ae75d38e5..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; - -import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.metrics.Counter; - -public class FlinkSourceCompat { - - public static Counter getNumRecordsInCounter(SourceReaderContext context) { - return context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); - } -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java deleted file mode 100644 index 06fdd781fc5c..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; - -import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.SplitEnumerator; - -public interface SplitEnumeratorCompat - extends SplitEnumerator { - - CheckpointT snapshotState() throws Exception; -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java deleted file mode 100644 index 08bba20e576e..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Classes helping maintain backwards compatibility across Flink versions. */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java deleted file mode 100644 index 1bbcd0159b1f..000000000000 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.util.concurrent.CompletableFuture; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.minicluster.MiniCluster; - -public class MiniClusterCompat { - public static CompletableFuture triggerSavepoint( - MiniCluster cluster, JobID jobId, String targetDirectory, boolean cancelJob) { - return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob); - } -} diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java deleted file mode 100644 index ee1df83a2110..000000000000 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.metrics; - -import org.apache.flink.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; - -/** - * Compatibility layer for {@link MetricGroup} breaking changes made in Flink 1.14 - * (https://issues.apache.org/jira/browse/FLINK-23652). - */ -public interface MetricGroupWrapper extends OperatorMetricGroup { - static OperatorMetricGroup createUnregisteredMetricGroup() { - return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); - } -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 100% rename from runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java deleted file mode 100644 index 1b9baaef3f9c..000000000000 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; - -public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {} diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java deleted file mode 100644 index f02ad36116c9..000000000000 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.util.concurrent.CompletableFuture; -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.runtime.minicluster.MiniCluster; - -public class MiniClusterCompat { - public static CompletableFuture triggerSavepoint( - MiniCluster cluster, JobID jobId, String targetDirectory, boolean cancelJob) { - return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob, SavepointFormatType.DEFAULT); - } -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 5726c11330b4..d1224400dfcd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -49,7 +49,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; @@ -114,6 +113,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -1724,10 +1724,10 @@ public void translateNode(TestStream testStream, FlinkStreamingTranslationCon static class UnboundedSourceWrapperNoValueWithRecordId< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction> - implements ProcessingTimeCallbackCompat, - BeamStoppableFunction, + implements BeamStoppableFunction, CheckpointListener, - CheckpointedFunction { + CheckpointedFunction, + ProcessingTimeCallback { private final UnboundedSourceWrapper unboundedSourceWrapper; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 3e64b1003ad3..3e5fb2a53d6e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -89,11 +89,12 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -107,8 +108,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; @@ -117,6 +120,7 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -141,8 +145,7 @@ "keyfor", "nullness" }) // TODO(https://github.com/apache/beam/issues/20497) -public class DoFnOperator - extends AbstractStreamOperatorCompat> +public class DoFnOperator extends AbstractStreamOperator> implements OneInputStreamOperator, WindowedValue>, TwoInputStreamOperator, RawUnionValue, WindowedValue>, Triggerable { @@ -218,7 +221,7 @@ public class DoFnOperator private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32; protected transient InternalTimerService timerService; - private transient InternalTimeServiceManager timeServiceManagerCompat; + private transient InternalTimeServiceManager timeServiceManager; private transient PushedBackElementsHandler> pushedBackElementsHandler; @@ -451,7 +454,9 @@ public void initializeState(StateInitializationContext context) throws Exception } timerInternals = new FlinkTimerInternals(timerService); - timeServiceManagerCompat = getTimeServiceManagerCompat(); + timeServiceManager = + getTimeServiceManager() + .orElseThrow(() -> new IllegalStateException("Time service manager is not set.")); } outputManager = @@ -568,7 +573,6 @@ private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAc } } - @Override void cleanUp() throws Exception { Optional.ofNullable(flinkMetricContainer) .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult); @@ -577,7 +581,6 @@ void cleanUp() throws Exception { Optional.ofNullable(doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown); } - @Override void flushData() throws Exception { // This is our last change to block shutdown of this operator while // there are still remaining processing-time timers. Flink will ignore pending @@ -618,6 +621,44 @@ void flushData() throws Exception { } } + @Override + public void finish() throws Exception { + try { + flushData(); + } finally { + super.finish(); + } + } + + @Override + public void close() throws Exception { + try { + cleanUp(); + } finally { + super.close(); + } + } + + protected int numProcessingTimeTimers() { + return getTimeServiceManager() + .map( + manager -> { + if (timeServiceManager instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) timeServiceManager; + return cast.numProcessingTimeTimers(); + } else if (timeServiceManager instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", + timeServiceManager)); + } + }) + .orElse(0); + } + public long getEffectiveInputWatermark() { // hold back by the pushed back values waiting for side inputs return Math.min(pushedBackWatermark, currentInputWatermark); @@ -746,7 +787,7 @@ public final void processWatermark1(Watermark mark) throws Exception { private void processInputWatermark(boolean advanceInputWatermark) throws Exception { long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark()); if (keyCoder != null && advanceInputWatermark) { - timeServiceManagerCompat.advanceWatermark(new Watermark(inputWatermarkHold)); + timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold)); } long potentialOutputWatermark = @@ -899,7 +940,7 @@ private void checkInvokeFinishBundleByTime() { } @SuppressWarnings("FutureReturnValueIgnored") - protected void scheduleForCurrentProcessingTime(ProcessingTimeCallbackCompat callback) { + protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback callback) { // We are scheduling a timer for advancing the watermark, to not delay finishing the bundle // and temporarily release the checkpoint lock. Otherwise, we could potentially loop when a // timer keeps scheduling a timer for the same timestamp. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 92d0652e11f8..b0202d0db8d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.utils.Workarounds; -import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -67,10 +67,10 @@ }) public class UnboundedSourceWrapper extends RichParallelSourceFunction>> - implements ProcessingTimeCallbackCompat, - BeamStoppableFunction, + implements BeamStoppableFunction, CheckpointListener, - CheckpointedFunction { + CheckpointedFunction, + ProcessingTimeCallback { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index d4b3005c86c9..bd1607954484 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -41,7 +41,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; @@ -129,7 +128,7 @@ protected FlinkSourceReaderBase( this.idleTimeoutFuture = new CompletableFuture<>(); this.waitingForSplitChangeFuture = new CompletableFuture<>(); this.idleTimeoutCountingDown = false; - this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context); + this.numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); FlinkMetricContainerWithoutAccumulator metricsContainer = new FlinkMetricContainerWithoutAccumulator(context.metricGroup()); this.invocationUtil = new ReaderInvocationUtil<>(stepName, pipelineOptions, metricsContainer); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java index 292697479bcd..99adf12d37d8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -25,11 +25,11 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; import org.slf4j.Logger; @@ -49,7 +49,7 @@ * @param The output type of the encapsulated Beam {@link Source}. */ public class FlinkSourceSplitEnumerator - implements SplitEnumeratorCompat, Map>>> { + implements SplitEnumerator, Map>>> { private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class); private final SplitEnumeratorContext> context; private final Source beamSource; @@ -133,11 +133,6 @@ public void addReader(int subtaskId) { @Override public Map>> snapshotState(long checkpointId) throws Exception { LOG.info("Taking snapshot for checkpoint {}", checkpointId); - return snapshotState(); - } - - @Override - public Map>> snapshotState() throws Exception { return pendingSplits; } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java index 32fd7f8a1060..12e9ad1a0adc 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -58,6 +59,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -276,7 +278,9 @@ private String takeSavepoint(JobID jobID) throws Exception { // try multiple times because the job might not be ready yet for (int i = 0; i < 10; i++) { try { - return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null, false).get(); + CompletableFuture savepointFuture = + flinkCluster.triggerSavepoint(jobID, null, false, SavepointFormatType.DEFAULT); + return savepointFuture.get(); } catch (Exception e) { exception = e; LOG.debug("Exception while triggerSavepoint, trying again", e); diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java similarity index 100% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java index 930ad88b0dcb..f91d99dd5abe 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java @@ -49,6 +49,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -59,7 +60,7 @@ public class FlinkMetricContainerTest { @Mock private RuntimeContext runtimeContext; - @Mock private MetricGroupWrapper metricGroup; + @Mock private OperatorMetricGroup metricGroup; FlinkMetricContainer container; diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java similarity index 100% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java similarity index 100% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java index 2b6a86391420..8e754d82194b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java @@ -42,7 +42,6 @@ import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; -import org.apache.beam.runners.flink.metrics.MetricGroupWrapper; import org.apache.beam.runners.flink.streaming.StreamSources; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -58,6 +57,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -710,7 +710,7 @@ public void testSequentialReadingFromBoundedSource() throws Exception { when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService); when(runtimeContextMock.getMetricGroup()) - .thenReturn(MetricGroupWrapper.createUnregisteredMetricGroup()); + .thenReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()); sourceWrapper.setRuntimeContext(runtimeContextMock); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java index 462a1ba0153d..82440abf926c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java @@ -189,7 +189,7 @@ public void testNumBytesInMetrics() throws Exception { final int numRecordsPerSplit = 10; List>> splits = createSplits(numSplits, numRecordsPerSplit, 0); - SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup(); + SourceTestMetrics.TestMetricGroup testMetricGroup = new SourceTestMetrics.TestMetricGroup(); try (SourceReader>> reader = createReader(null, -1L, null, testMetricGroup)) { pollAndValidate(reader, splits, false); @@ -201,7 +201,7 @@ public void testNumBytesInMetrics() throws Exception { public void testMetricsContainer() throws Exception { ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService(); - SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup(); + SourceTestMetrics.TestMetricGroup testMetricGroup = new SourceTestMetrics.TestMetricGroup(); try (SourceReader>> reader = createReader(executor, 0L, null, testMetricGroup)) { reader.start(); @@ -229,24 +229,24 @@ protected abstract SourceReader>> ScheduledExecutorService executor, long idleTimeoutMs, @Nullable Function timestampExtractor, - SourceTestCompat.TestMetricGroup testMetricGroup); + SourceTestMetrics.TestMetricGroup testMetricGroup); protected abstract Source> createBeamSource( int splitIndex, int numRecordsPerSplit); // ------------------- protected helper methods ---------------------- protected SourceReader>> createReader() { - return createReader(null, -1L, null, new SourceTestCompat.TestMetricGroup()); + return createReader(null, -1L, null, new SourceTestMetrics.TestMetricGroup()); } protected SourceReader>> createReader( Function timestampExtractor) { - return createReader(null, -1L, timestampExtractor, new SourceTestCompat.TestMetricGroup()); + return createReader(null, -1L, timestampExtractor, new SourceTestMetrics.TestMetricGroup()); } protected SourceReader>> createReader( ScheduledExecutorService executor, long idleTimeoutMs) { - return createReader(executor, idleTimeoutMs, null, new SourceTestCompat.TestMetricGroup()); + return createReader(executor, idleTimeoutMs, null, new SourceTestMetrics.TestMetricGroup()); } protected SourceReader>> createReader( @@ -254,7 +254,7 @@ protected SourceReader>> createRe long idleTimeoutMs, Function timestampExtractor) { return createReader( - executor, idleTimeoutMs, timestampExtractor, new SourceTestCompat.TestMetricGroup()); + executor, idleTimeoutMs, timestampExtractor, new SourceTestMetrics.TestMetricGroup()); } protected void pollAndValidate( @@ -307,14 +307,14 @@ protected void verifyBeamReaderClosed(List } protected static SourceReaderContext createSourceReaderContext( - SourceTestCompat.TestMetricGroup metricGroup) { + SourceTestMetrics.TestMetricGroup metricGroup) { SourceReaderContext mockContext = Mockito.mock(SourceReaderContext.class); when(mockContext.metricGroup()).thenReturn(metricGroup); return mockContext; } // -------------------- protected helper class for fetch result validation --------------------- - protected class RecordsValidatingOutput implements SourceTestCompat.ReaderOutputCompat { + protected class RecordsValidatingOutput implements SourceTestMetrics.ReaderOutputCompat { private final List>> sources; private final Map sourceOutputs; private int numCollectedRecords = 0; @@ -384,7 +384,7 @@ public Map createdSourceOutputs() { } } - protected class TestSourceOutput implements SourceTestCompat.SourceOutputCompat { + protected class TestSourceOutput implements SourceTestMetrics.SourceOutputCompat { private final ReaderOutput output; private @Nullable Watermark watermark; private boolean isIdle; diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java similarity index 99% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java index 8cda1341fd22..70cb460d1437 100644 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java @@ -28,7 +28,7 @@ import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -public class SourceTestCompat { +public class SourceTestMetrics { /** A MetricGroup implementation which records the registered gauge. */ public static class TestMetricGroup extends UnregisteredMetricsGroup diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java index 84cb2a72ddaf..556f47a4f580 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestMetrics.TestMetricGroup; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index b7cba373cf75..b858df5ff4bb 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -36,7 +36,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestMetrics.TestMetricGroup; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index ad25ce87771c..db97fbdf62d4 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,7 +68,7 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.12,1.13,1.14,1.15' +'flink_versions: 1.15,1.16,1.17,1.18' ``` #### 2. Set to the latest flink runner version i.e. 1.15 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 80cf32e111df..7bf2e509bed3 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1459,7 +1459,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16'] + PUBLISHED_FLINK_VERSIONS = ['1.15', '1.16', '1.17', '1.18'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index db6c7e8b0a1f..d41d0eff2dc5 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.12", "1.13", "1.14"]; +const PUBLISHED_FLINK_VERSIONS = ["1.15", "1.16", "1.17, "1.18""]; const defaultOptions = { flinkMaster: "[local]", diff --git a/settings.gradle.kts b/settings.gradle.kts index 8a9cf8de77db..0b82ea997ccc 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -79,10 +79,14 @@ include(":runners:core-java") include(":runners:direct-java") include(":runners:extensions-java:metrics") /* Begin Flink Runner related settings */ -// Flink 1.14 -include(":runners:flink:1.14") -include(":runners:flink:1.14:job-server") -include(":runners:flink:1.14:job-server-container") +/* When updating these versions, please make sure that the following files are updated as well: + * FLINK_VERSIONS in .github/actions/setup-default-test-properties/test-properties.json + * flink_versions in sdks/go/examples/wasm/README.md + * PUBLISHED_FLINK_VERSIONS in sdks/python/apache_beam/options/pipeline_options.py + * PUBLISHED_FLINK_VERSIONS in sdks/typescript/src/apache_beam/runners/flink.ts + * verify versions in website/www/site/content/en/documentation/runners/flink.md + * verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py + */ // Flink 1.15 include(":runners:flink:1.15") include(":runners:flink:1.15:job-server") diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index 2fc4bc83bb70..474ae5fca5a2 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -196,11 +196,18 @@ The optional `flink_version` option may be required as well for older versions o {{< paragraph class="language-portable" >}} Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: +<<<<<<< HEAD [Flink 1.10](https://hub.docker.com/r/apache/beam_flink1.10_job_server), [Flink 1.11](https://hub.docker.com/r/apache/beam_flink1.11_job_server), [Flink 1.12](https://hub.docker.com/r/apache/beam_flink1.12_job_server). [Flink 1.13](https://hub.docker.com/r/apache/beam_flink1.13_job_server). [Flink 1.14](https://hub.docker.com/r/apache/beam_flink1.14_job_server). +======= +[Flink 1.15](https://hub.docker.com/r/apache/beam_flink1.15_job_server). +[Flink 1.16](https://hub.docker.com/r/apache/beam_flink1.16_job_server). +[Flink 1.17](https://hub.docker.com/r/apache/beam_flink1.17_job_server). +[Flink 1.18](https://hub.docker.com/r/apache/beam_flink1.18_job_server). +>>>>>>> c10f5d40456 ([runners-flink] Drop Flnk 1.14 and clean up code (#31112)) {{< /paragraph >}} @@ -328,9 +335,29 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id - ≥ 2.38.0 - 1.14.x * + 1.18.x + beam-runners-flink-1.18 + ≥ 2.57.0 + + + 1.17.x + beam-runners-flink-1.17 + ≥ 2.56.0 + + + 1.16.x + beam-runners-flink-1.16 + ≥ 2.47.0 + + + 1.15.x + beam-runners-flink-1.15 + ≥ 2.40.0 + + + 1.14.x beam-runners-flink-1.14 + 2.38.0 - 2.56.0 1.13.x * diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index abefaa23e6b1..77d57cbd4d2d 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -97,6 +97,16 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + forceUnalignedCheckpointEnabled + Forces unaligned checkpoints, particularly allowing them for iterative jobs. + Default: false + + + jobCheckIntervalInSecs + Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds + Default: 5 + latencyTrackingInterval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature. @@ -187,4 +197,9 @@ State backend path to persist state backend data. Used to initialize state backend. + + unalignedCheckpointEnabled + If set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore + Default: false + diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index b0abe63e4896..adf7df76e3f9 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -97,6 +97,16 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + force_unaligned_checkpoint_enabled + Forces unaligned checkpoints, particularly allowing them for iterative jobs. + Default: false + + + job_check_interval_in_secs + Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds + Default: 5 + latency_tracking_interval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature. @@ -187,4 +197,9 @@ State backend path to persist state backend data. Used to initialize state backend. + + unaligned_checkpoint_enabled + If set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore + Default: false +