From c10f5d404563c992f7a7f8f55d265657726d1604 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Fri, 24 May 2024 15:49:34 +0200 Subject: [PATCH] [runners-flink] Drop Flnk 1.14 and clean up code (#31112) --- .../test-properties.json | 2 +- gradle.properties | 2 +- .../runner-concepts/description.md | 2 +- runners/flink/1.14/build.gradle | 25 ----- .../1.14/job-server-container/build.gradle | 26 ------ runners/flink/1.14/job-server/build.gradle | 31 ------- .../AbstractStreamOperatorCompat.java | 91 ------------------- .../ProcessingTimeCallbackCompat.java | 22 ----- .../io/source/compat/FlinkSourceCompat.java | 28 ------ .../source/compat/SplitEnumeratorCompat.java | 27 ------ .../io/source/compat/package-info.java | 20 ---- .../beam/runners/flink/MiniClusterCompat.java | 29 ------ .../flink/metrics/MetricGroupWrapper.java | 31 ------- .../types/CoderTypeSerializer.java | 0 .../ProcessingTimeCallbackCompat.java | 22 ----- .../beam/runners/flink/MiniClusterCompat.java | 30 ------ .../FlinkStreamingTransformTranslators.java | 8 +- .../wrappers/streaming/DoFnOperator.java | 57 ++++++++++-- .../streaming/io/UnboundedSourceWrapper.java | 8 +- .../io/source/FlinkSourceReaderBase.java | 3 +- .../io/source/FlinkSourceSplitEnumerator.java | 9 +- .../runners/flink/FlinkSavepointTest.java | 6 +- .../runners/flink/RemoteMiniClusterImpl.java | 0 .../metrics/FlinkMetricContainerTest.java | 3 +- .../flink/streaming/StreamSources.java | 0 .../types/CoderTypeSerializerTest.java | 0 .../io/UnboundedSourceWrapperTest.java | 4 +- .../io/source/FlinkSourceReaderTestBase.java | 20 ++-- .../io/source/SourceTestMetrics.java} | 2 +- .../bounded/FlinkBoundedSourceReaderTest.java | 2 +- .../FlinkUnboundedSourceReaderTest.java | 2 +- sdks/go/examples/wasm/README.md | 2 +- .../apache_beam/options/pipeline_options.py | 2 +- .../src/apache_beam/runners/flink.ts | 2 +- settings.gradle.kts | 4 - .../content/en/documentation/runners/flink.md | 9 +- .../flink_java_pipeline_options.html | 10 ++ .../flink_python_pipeline_options.html | 10 ++ 38 files changed, 115 insertions(+), 436 deletions(-) delete mode 100644 runners/flink/1.14/build.gradle delete mode 100644 runners/flink/1.14/job-server-container/build.gradle delete mode 100644 runners/flink/1.14/job-server/build.gradle delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java delete mode 100644 runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java delete mode 100644 runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java delete mode 100644 runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java rename runners/flink/{1.14 => 1.15}/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java (100%) delete mode 100644 runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java delete mode 100644 runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java rename runners/flink/{1.14 => }/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java (100%) rename runners/flink/{1.14 => }/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java (100%) rename runners/flink/{1.14 => }/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java (100%) rename runners/flink/{1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java => src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java} (99%) diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index 4a43463eb2f9..d7e1ef55bf53 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21"], - "FLINK_VERSIONS": ["1.14", "1.15", "1.16", "1.17"], + "FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18"], "SPARK_VERSIONS": ["2", "3"] }, "GoTestProperties": { diff --git a/gradle.properties b/gradle.properties index f3dea792ccc1..c2ce1414a225 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.14,1.15,1.16,1.17,1.18 +flink_versions=1.15,1.16,1.17,1.18 # supported python versions python_versions=3.8,3.9,3.10,3.11,3.12 diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md index 47c0d92286cf..3989f6de6510 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/runner-concepts/description.md @@ -191,7 +191,7 @@ $ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \ {{if (eq .Sdk "java")}} ##### Portable -1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.14`, `Flink 1.15`, `Flink 1.16`, `Flink 1.17`. +1. Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: `Flink 1.15`, `Flink 1.16`, `Flink 1.17`, `Flink 1.18`. 2. Start the JobService endpoint: `docker run --net=host apache/beam_flink1.10_job_server:latest` 3. Submit the pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example: diff --git a/runners/flink/1.14/build.gradle b/runners/flink/1.14/build.gradle deleted file mode 100644 index cfbd8f8dde79..000000000000 --- a/runners/flink/1.14/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.14' - flink_version = '1.14.3' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.14/job-server-container/build.gradle b/runners/flink/1.14/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.14/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.14/job-server/build.gradle b/runners/flink/1.14/job-server/build.gradle deleted file mode 100644 index 58f46bb7709c..000000000000 --- a/runners/flink/1.14/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.14-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java deleted file mode 100644 index d8740964fda9..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; -import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; - -/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ -public abstract class AbstractStreamOperatorCompat - extends AbstractStreamOperator { - - /** - * Getter for timeServiceManager, which has been made private in Flink 1.11. - * - * @return Time service manager. - */ - protected InternalTimeServiceManager getTimeServiceManagerCompat() { - return getTimeServiceManager() - .orElseThrow(() -> new IllegalStateException("Time service manager is not set.")); - } - - /** - * This call has been removed from {@link AbstractStreamOperator} in Flink 1.12. - * - *

{@link InternalTimeServiceManagerImpl#numProcessingTimeTimers()} - */ - protected int numProcessingTimeTimers() { - return getTimeServiceManager() - .map( - manager -> { - InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); - if (tsm instanceof InternalTimeServiceManagerImpl) { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); - } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { - return 0; - } else { - throw new IllegalStateException( - String.format( - "Unknown implementation of InternalTimerServiceManager. %s", tsm)); - } - }) - .orElse(0); - } - - /** Release all of the operator's resources. */ - abstract void cleanUp() throws Exception; - - /** Flush all remaining buffered data. */ - abstract void flushData() throws Exception; - - // Start with Flink 1.14, dispose() has been removed. finish() flushes remaining data, while - // close() no longer flushes data, close() now only releases the operator's resources. - // https://issues.apache.org/jira/browse/FLINK-22972 - - @Override - public void finish() throws Exception { - try { - flushData(); - } finally { - super.finish(); - } - } - - @Override - public void close() throws Exception { - try { - cleanUp(); - } finally { - super.close(); - } - } -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java deleted file mode 100644 index a494fec01dde..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; - -public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java deleted file mode 100644 index f68ae75d38e5..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; - -import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.metrics.Counter; - -public class FlinkSourceCompat { - - public static Counter getNumRecordsInCounter(SourceReaderContext context) { - return context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); - } -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java deleted file mode 100644 index 06fdd781fc5c..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; - -import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.SplitEnumerator; - -public interface SplitEnumeratorCompat - extends SplitEnumerator { - - CheckpointT snapshotState() throws Exception; -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java deleted file mode 100644 index 08bba20e576e..000000000000 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Classes helping maintain backwards compatibility across Flink versions. */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java deleted file mode 100644 index 1bbcd0159b1f..000000000000 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.util.concurrent.CompletableFuture; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.minicluster.MiniCluster; - -public class MiniClusterCompat { - public static CompletableFuture triggerSavepoint( - MiniCluster cluster, JobID jobId, String targetDirectory, boolean cancelJob) { - return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob); - } -} diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java deleted file mode 100644 index ee1df83a2110..000000000000 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.metrics; - -import org.apache.flink.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; - -/** - * Compatibility layer for {@link MetricGroup} breaking changes made in Flink 1.14 - * (https://issues.apache.org/jira/browse/FLINK-23652). - */ -public interface MetricGroupWrapper extends OperatorMetricGroup { - static OperatorMetricGroup createUnregisteredMetricGroup() { - return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); - } -} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 100% rename from runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java deleted file mode 100644 index 1b9baaef3f9c..000000000000 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; - -public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {} diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java deleted file mode 100644 index f02ad36116c9..000000000000 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.util.concurrent.CompletableFuture; -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.runtime.minicluster.MiniCluster; - -public class MiniClusterCompat { - public static CompletableFuture triggerSavepoint( - MiniCluster cluster, JobID jobId, String targetDirectory, boolean cancelJob) { - return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob, SavepointFormatType.DEFAULT); - } -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d2171d27a142..f9089d11a25e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; @@ -102,6 +101,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -1515,10 +1515,10 @@ void translateNode(TestStream testStream, FlinkStreamingTranslationContext co static class UnboundedSourceWrapperNoValueWithRecordId< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction> - implements ProcessingTimeCallbackCompat, - BeamStoppableFunction, + implements BeamStoppableFunction, CheckpointListener, - CheckpointedFunction { + CheckpointedFunction, + ProcessingTimeCallback { private final UnboundedSourceWrapper unboundedSourceWrapper; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 63f5ede00242..1072702c3e66 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -93,6 +93,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; @@ -106,8 +107,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; @@ -116,6 +119,7 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -140,8 +144,7 @@ "keyfor", "nullness" }) // TODO(https://github.com/apache/beam/issues/20497) -public class DoFnOperator - extends AbstractStreamOperatorCompat> +public class DoFnOperator extends AbstractStreamOperator> implements OneInputStreamOperator, WindowedValue>, TwoInputStreamOperator, RawUnionValue, WindowedValue>, Triggerable { @@ -225,7 +228,7 @@ public class DoFnOperator private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32; protected transient InternalTimerService timerService; - private transient InternalTimeServiceManager timeServiceManagerCompat; + private transient InternalTimeServiceManager timeServiceManager; private transient PushedBackElementsHandler> pushedBackElementsHandler; @@ -471,7 +474,9 @@ public void initializeState(StateInitializationContext context) throws Exception } timerInternals = new FlinkTimerInternals(timerService); - timeServiceManagerCompat = getTimeServiceManagerCompat(); + timeServiceManager = + getTimeServiceManager() + .orElseThrow(() -> new IllegalStateException("Time service manager is not set.")); } outputManager = @@ -605,7 +610,6 @@ private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAc } } - @Override void cleanUp() throws Exception { Optional.ofNullable(flinkMetricContainer) .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult); @@ -614,7 +618,6 @@ void cleanUp() throws Exception { Optional.ofNullable(doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown); } - @Override void flushData() throws Exception { // This is our last change to block shutdown of this operator while // there are still remaining processing-time timers. Flink will ignore pending @@ -663,6 +666,44 @@ void flushData() throws Exception { } } + @Override + public void finish() throws Exception { + try { + flushData(); + } finally { + super.finish(); + } + } + + @Override + public void close() throws Exception { + try { + cleanUp(); + } finally { + super.close(); + } + } + + protected int numProcessingTimeTimers() { + return getTimeServiceManager() + .map( + manager -> { + if (timeServiceManager instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) timeServiceManager; + return cast.numProcessingTimeTimers(); + } else if (timeServiceManager instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", + timeServiceManager)); + } + }) + .orElse(0); + } + public long getEffectiveInputWatermark() { // hold back by the pushed back values waiting for side inputs long combinedPushedBackWatermark = pushedBackWatermark; @@ -796,7 +837,7 @@ public final void processWatermark1(Watermark mark) throws Exception { private void processInputWatermark(boolean advanceInputWatermark) throws Exception { long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark()); if (keyCoder != null && advanceInputWatermark) { - timeServiceManagerCompat.advanceWatermark(new Watermark(inputWatermarkHold)); + timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold)); } long potentialOutputWatermark = @@ -949,7 +990,7 @@ private void checkInvokeFinishBundleByTime() { } @SuppressWarnings("FutureReturnValueIgnored") - protected void scheduleForCurrentProcessingTime(ProcessingTimeCallbackCompat callback) { + protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback callback) { // We are scheduling a timer for advancing the watermark, to not delay finishing the bundle // and temporarily release the checkpoint lock. Otherwise, we could potentially loop when a // timer keeps scheduling a timer for the same timestamp. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 010d341c39a7..d6beafad8d93 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -27,7 +27,6 @@ import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.utils.Workarounds; -import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -67,10 +67,10 @@ }) public class UnboundedSourceWrapper extends RichParallelSourceFunction>> - implements ProcessingTimeCallbackCompat, - BeamStoppableFunction, + implements BeamStoppableFunction, CheckpointListener, - CheckpointedFunction { + CheckpointedFunction, + ProcessingTimeCallback { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index 1f4f31f90ebf..db0df8676644 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; @@ -124,7 +123,7 @@ protected FlinkSourceReaderBase( pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs(); this.idleTimeoutFuture = new CompletableFuture<>(); this.idleTimeoutCountingDown = false; - this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context); + this.numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); FlinkMetricContainerWithoutAccumulator metricsContainer = new FlinkMetricContainerWithoutAccumulator(context.metricGroup()); this.invocationUtil = new ReaderInvocationUtil<>(stepName, pipelineOptions, metricsContainer); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java index 70afd76b611b..be2e8ad9ad77 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -25,11 +25,11 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; import org.slf4j.Logger; @@ -49,7 +49,7 @@ * @param The output type of the encapsulated Beam {@link Source}. */ public class FlinkSourceSplitEnumerator - implements SplitEnumeratorCompat, Map>>> { + implements SplitEnumerator, Map>>> { private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class); private final SplitEnumeratorContext> context; private final Source beamSource; @@ -133,11 +133,6 @@ public void addReader(int subtaskId) { @Override public Map>> snapshotState(long checkpointId) throws Exception { LOG.info("Taking snapshot for checkpoint {}", checkpointId); - return snapshotState(); - } - - @Override - public Map>> snapshotState() throws Exception { return pendingSplits; } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java index 6ba30a8e020a..c7dfe7f6cb78 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -58,6 +59,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -276,7 +278,9 @@ private String takeSavepoint(JobID jobID) throws Exception { // try multiple times because the job might not be ready yet for (int i = 0; i < 10; i++) { try { - return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null, false).get(); + CompletableFuture savepointFuture = + flinkCluster.triggerSavepoint(jobID, null, false, SavepointFormatType.DEFAULT); + return savepointFuture.get(); } catch (Exception e) { exception = e; LOG.debug("Exception while triggerSavepoint, trying again", e); diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java similarity index 100% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java index a93a7663c451..697ef80adaf7 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java @@ -49,6 +49,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -59,7 +60,7 @@ public class FlinkMetricContainerTest { @Mock private RuntimeContext runtimeContext; - @Mock private MetricGroupWrapper metricGroup; + @Mock private OperatorMetricGroup metricGroup; FlinkMetricContainer container; diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java similarity index 100% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java similarity index 100% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java index 92fcc58a21bf..8a588894acd3 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java @@ -41,7 +41,6 @@ import java.util.stream.LongStream; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; -import org.apache.beam.runners.flink.metrics.MetricGroupWrapper; import org.apache.beam.runners.flink.streaming.StreamSources; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -58,6 +57,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -716,7 +716,7 @@ public void testSequentialReadingFromBoundedSource() throws Exception { when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService); when(runtimeContextMock.getMetricGroup()) - .thenReturn(MetricGroupWrapper.createUnregisteredMetricGroup()); + .thenReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()); sourceWrapper.setRuntimeContext(runtimeContextMock); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java index bb45c3eadb73..e73874bed660 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java @@ -191,7 +191,7 @@ public void testNumBytesInMetrics() throws Exception { final int numRecordsPerSplit = 10; List>> splits = createSplits(numSplits, numRecordsPerSplit, 0); - SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup(); + SourceTestMetrics.TestMetricGroup testMetricGroup = new SourceTestMetrics.TestMetricGroup(); try (SourceReader>> reader = createReader(null, -1L, null, testMetricGroup)) { pollAndValidate(reader, splits, false); @@ -203,7 +203,7 @@ public void testNumBytesInMetrics() throws Exception { public void testMetricsContainer() throws Exception { ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService(); - SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup(); + SourceTestMetrics.TestMetricGroup testMetricGroup = new SourceTestMetrics.TestMetricGroup(); try (SourceReader>> reader = createReader(executor, 0L, null, testMetricGroup)) { reader.start(); @@ -231,24 +231,24 @@ protected abstract SourceReader>> ScheduledExecutorService executor, long idleTimeoutMs, @Nullable Function timestampExtractor, - SourceTestCompat.TestMetricGroup testMetricGroup); + SourceTestMetrics.TestMetricGroup testMetricGroup); protected abstract Source> createBeamSource( int splitIndex, int numRecordsPerSplit); // ------------------- protected helper methods ---------------------- protected SourceReader>> createReader() { - return createReader(null, -1L, null, new SourceTestCompat.TestMetricGroup()); + return createReader(null, -1L, null, new SourceTestMetrics.TestMetricGroup()); } protected SourceReader>> createReader( Function timestampExtractor) { - return createReader(null, -1L, timestampExtractor, new SourceTestCompat.TestMetricGroup()); + return createReader(null, -1L, timestampExtractor, new SourceTestMetrics.TestMetricGroup()); } protected SourceReader>> createReader( ScheduledExecutorService executor, long idleTimeoutMs) { - return createReader(executor, idleTimeoutMs, null, new SourceTestCompat.TestMetricGroup()); + return createReader(executor, idleTimeoutMs, null, new SourceTestMetrics.TestMetricGroup()); } protected SourceReader>> createReader( @@ -256,7 +256,7 @@ protected SourceReader>> createRe long idleTimeoutMs, Function timestampExtractor) { return createReader( - executor, idleTimeoutMs, timestampExtractor, new SourceTestCompat.TestMetricGroup()); + executor, idleTimeoutMs, timestampExtractor, new SourceTestMetrics.TestMetricGroup()); } protected void pollAndValidate( @@ -315,14 +315,14 @@ protected void verifyBeamReaderClosed(List } protected static SourceReaderContext createSourceReaderContext( - SourceTestCompat.TestMetricGroup metricGroup) { + SourceTestMetrics.TestMetricGroup metricGroup) { SourceReaderContext mockContext = Mockito.mock(SourceReaderContext.class); when(mockContext.metricGroup()).thenReturn(metricGroup); return mockContext; } // -------------------- protected helper class for fetch result validation --------------------- - protected class RecordsValidatingOutput implements SourceTestCompat.ReaderOutputCompat { + protected class RecordsValidatingOutput implements SourceTestMetrics.ReaderOutputCompat { private final List>> sources; private final Map sourceOutputs; private int numCollectedRecords = 0; @@ -390,7 +390,7 @@ public Map createdSourceOutputs() { } } - protected class TestSourceOutput implements SourceTestCompat.SourceOutputCompat { + protected class TestSourceOutput implements SourceTestMetrics.SourceOutputCompat { private final ReaderOutput output; private @Nullable Watermark watermark; private boolean isIdle; diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java similarity index 99% rename from runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java index 8cda1341fd22..70cb460d1437 100644 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestMetrics.java @@ -28,7 +28,7 @@ import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -public class SourceTestCompat { +public class SourceTestMetrics { /** A MetricGroup implementation which records the registered gauge. */ public static class TestMetricGroup extends UnregisteredMetricsGroup diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java index 1cd83663b1d4..bf9ea348230a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java @@ -31,7 +31,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestMetrics.TestMetricGroup; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index dccadc5e7f24..0a29d4f37ad5 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -37,7 +37,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.EmptyUnboundedSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestMetrics.TestMetricGroup; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index cb0816588502..84d30a3c6a63 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,7 +68,7 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.14,1.15,1.16,1.17' +'flink_versions: 1.15,1.16,1.17,1.18' ``` #### 2. Set to the latest flink runner version i.e. 1.16 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 93cde0cf2d40..0af32837a3fb 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1591,7 +1591,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.14', '1.15', '1.16', '1.17'] + PUBLISHED_FLINK_VERSIONS = ['1.15', '1.16', '1.17', '1.18'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index 3b672b48105c..bedcd7266289 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.14", "1.15", "1.16", "1.17"]; +const PUBLISHED_FLINK_VERSIONS = ["1.15", "1.16", "1.17, "1.18""]; const defaultOptions = { flinkMaster: "[local]", diff --git a/settings.gradle.kts b/settings.gradle.kts index 3f0e5d631b69..f8f3bc5c5c34 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -129,10 +129,6 @@ include(":runners:extensions-java:metrics") * verify versions in website/www/site/content/en/documentation/runners/flink.md * verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py */ -// Flink 1.14 -include(":runners:flink:1.14") -include(":runners:flink:1.14:job-server") -include(":runners:flink:1.14:job-server-container") // Flink 1.15 include(":runners:flink:1.15") include(":runners:flink:1.15:job-server") diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index 94e6036af87b..7325c480955c 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -196,10 +196,10 @@ The optional `flink_version` option may be required as well for older versions o {{< paragraph class="language-portable" >}} Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: -[Flink 1.14](https://hub.docker.com/r/apache/beam_flink1.14_job_server). [Flink 1.15](https://hub.docker.com/r/apache/beam_flink1.15_job_server). [Flink 1.16](https://hub.docker.com/r/apache/beam_flink1.16_job_server). [Flink 1.17](https://hub.docker.com/r/apache/beam_flink1.17_job_server). +[Flink 1.18](https://hub.docker.com/r/apache/beam_flink1.18_job_server). {{< /paragraph >}} @@ -326,6 +326,11 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id Supported Beam Versions + + 1.18.x + beam-runners-flink-1.18 + ≥ 2.57.0 + 1.17.x beam-runners-flink-1.17 @@ -344,7 +349,7 @@ To find out which version of Flink is compatible with Beam please see the table 1.14.x beam-runners-flink-1.14 - ≥ 2.38.0 + 2.38.0 - 2.56.0 1.13.x diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index 939c64ed9c49..60f1fd39bd13 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -107,6 +107,11 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + forceUnalignedCheckpointEnabled + Forces unaligned checkpoints, particularly allowing them for iterative jobs. + Default: false + jobCheckIntervalInSecs Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds @@ -202,4 +207,9 @@ State backend path to persist state backend data. Used to initialize state backend. + + unalignedCheckpointEnabled + If set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore + Default: false + diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index eb5c525d78b7..4faad5a994ba 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -107,6 +107,11 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + force_unaligned_checkpoint_enabled + Forces unaligned checkpoints, particularly allowing them for iterative jobs. + Default: false + job_check_interval_in_secs Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds @@ -202,4 +207,9 @@ State backend path to persist state backend data. Used to initialize state backend. + + unaligned_checkpoint_enabled + If set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore + Default: false +