From b34280578ebda5df8c88446cd82e1035cb441b40 Mon Sep 17 00:00:00 2001 From: David Moravek Date: Tue, 4 May 2021 11:56:59 +0200 Subject: [PATCH 1/4] [BEAM-12277] Add flink 1.13 build target. --- gradle.properties | 2 +- runners/flink/1.13/build.gradle | 33 +++++++++ .../1.13/job-server-container/build.gradle | 26 +++++++ runners/flink/1.13/job-server/build.gradle | 31 +++++++++ .../flink/streaming/StreamSources.java | 69 +++++++++++++++++++ settings.gradle.kts | 4 ++ 6 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 runners/flink/1.13/build.gradle create mode 100644 runners/flink/1.13/job-server-container/build.gradle create mode 100644 runners/flink/1.13/job-server/build.gradle create mode 100644 runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/gradle.properties b/gradle.properties index a5c5195bb377..953086f0c5f8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -32,5 +32,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.10,1.11,1.12 +flink_versions=1.10,1.11,1.12,1.13 diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle new file mode 100644 index 000000000000..3fce89bc470e --- /dev/null +++ b/runners/flink/1.13/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '..' +/* All properties required for loading the Flink build script */ +project.ext { + // Set the version of all Flink-related dependencies here. + flink_version = '1.13.0' + // Version specific code overrides. + main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/main/java", './src/test/java'] + main_resources_overrides = [] + test_resources_overrides = [] + archives_base_name = 'beam-runners-flink-1.13' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.13/job-server-container/build.gradle b/runners/flink/1.13/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/1.13/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.13/job-server/build.gradle b/runners/flink/1.13/job-server/build.gradle new file mode 100644 index 000000000000..a7e6fd6eb599 --- /dev/null +++ b/runners/flink/1.13/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.13-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java new file mode 100644 index 000000000000..2707395b194c --- /dev/null +++ b/runners/flink/1.13/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. + * + *

This change is becauses RecordWriter is wrapped in RecordWriterDelegate in 1.10, please refer + * to https://github.com/apache/flink/commit/2c8b4ef572f05bf4740b7e204af1e5e709cd945c for more + * details. + */ +public class StreamSources { + + /** + * Backward compatibility helper for {@link OneInputTransformation} `getInput` method, that has + * been removed in Flink 1.12. + * + * @param source Source to get single input from. + * @return Input transformation. + */ + public static Transformation getOnlyInput(OneInputTransformation source) { + return Iterables.getOnlyElement(source.getInputs()); + } + + public static > void run( + StreamSource streamSource, + Object lockingObject, + StreamStatusMaintainer streamStatusMaintainer, + Output> collector) + throws Exception { + streamSource.run( + lockingObject, streamStatusMaintainer, collector, createOperatorChain(streamSource)); + } + + private static OperatorChain createOperatorChain(AbstractStreamOperator operator) { + return new OperatorChain<>( + operator.getContainingTask(), + StreamTask.createRecordWriterDelegate( + operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 64444556565e..a6605d95a052 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -72,6 +72,10 @@ include(":runners:flink:1.11:job-server-container") include(":runners:flink:1.12") include(":runners:flink:1.12:job-server") include(":runners:flink:1.12:job-server-container") +// Flink 1.13 +include(":runners:flink:1.13") +include(":runners:flink:1.13:job-server") +include(":runners:flink:1.13:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java") From 8ad8965149ee9e39af09a3794834518929dfeabc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 12 May 2021 14:09:26 +0200 Subject: [PATCH 2/4] [BEAM-12277] Make Flink 1.13 the default Flink version to run the tests --- .../jenkins/CommonTestProperties.groovy | 2 +- CHANGES.md | 1 + .../runners/flink/metrics/FileReporter.java | 8 ++++++- .../apache_beam/options/pipeline_options.py | 2 +- .../content/en/documentation/runners/flink.md | 24 ++++++++++++++++--- 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/.test-infra/jenkins/CommonTestProperties.groovy b/.test-infra/jenkins/CommonTestProperties.groovy index 01ad3b513b97..d7be46453011 100644 --- a/.test-infra/jenkins/CommonTestProperties.groovy +++ b/.test-infra/jenkins/CommonTestProperties.groovy @@ -26,7 +26,7 @@ class CommonTestProperties { } static String getFlinkVersion() { - return "1.12" + return "1.13" } enum Runner { diff --git a/CHANGES.md b/CHANGES.md index 343d0f050e53..2b0781e74a91 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ ## New Features / Improvements * `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and `AGGREGATE` are now reserved keywords. ([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)). +* Flink 1.13 is now supported by the Flink runner ([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)). * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). ## Breaking Changes diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java index 35c32fa706fc..030abcbd0836 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java @@ -69,7 +69,13 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup final String name = group.getMetricIdentifier(metricName, this); super.notifyOfRemovedMetric(metric, metricName, group); synchronized (this) { - ps.printf("%s: %s%n", name, Metrics.toString(metric)); + try { + ps.printf("%s: %s%n", name, Metrics.toString(metric)); + } catch (NullPointerException e) { + // Workaround to avoid a NPE on Flink's DeclarativeSlotManager during unregister + // TODO Remove once FLINK-22646 is fixed on upstream Flink. + log.warn("unable to log details on metric {}", name); + } } } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 335cca8cfff1..3e2397a733bd 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1274,7 +1274,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.8', '1.9', '1.10', '1.11', '1.12'] + PUBLISHED_FLINK_VERSIONS = ['1.10', '1.11', '1.12', '1.13'] @classmethod def _add_argparse_args(cls, parser): diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index 4b5e5a445408..bc242806736a 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -199,6 +199,7 @@ Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are availab [Flink 1.10](https://hub.docker.com/r/apache/beam_flink1.10_job_server), [Flink 1.11](https://hub.docker.com/r/apache/beam_flink1.11_job_server), [Flink 1.12](https://hub.docker.com/r/apache/beam_flink1.12_job_server). +[Flink 1.13](https://hub.docker.com/r/apache/beam_flink1.13_job_server). {{< /paragraph >}} @@ -311,8 +312,8 @@ reference. ## Flink Version Compatibility The Flink cluster version has to match the minor version used by the FlinkRunner. -The minor version is the first two numbers in the version string, e.g. in `1.12.0` the -minor version is `1.12`. +The minor version is the first two numbers in the version string, e.g. in `1.13.0` the +minor version is `1.13`. We try to track the latest version of Apache Flink at the time of the Beam release. A Flink version is supported by Beam for the time it is supported by the Flink community. @@ -326,7 +327,24 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id - ≥ 2.30.0 + ≥ 2.31.0 + 1.13.x * + beam-runners-flink-1.13 + + + 1.12.x * + beam-runners-flink-1.12 + + + 1.11.x * + beam-runners-flink-1.11 + + + 1.10.x + beam-runners-flink-1.10 + + + 2.30.0 1.12.x * beam-runners-flink-1.12 From 8be1cb1e3b4567f1474f19f63ae56ce5637566f9 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Wed, 12 May 2021 11:31:39 -0700 Subject: [PATCH 3/4] [BEAM-12277] Update expected metric name formatting. --- .../runners/portability/flink_runner_test.py | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index efdeb4e4a802..1adc278feeb9 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -337,46 +337,47 @@ def process(self, kv, state=beam.DoFn.StateParam(state_spec)): if options.view_as(StandardOptions).streaming: lines_expected.update([ # Gauges for the last finished bundle - 'stateful.beam.metric:statecache:capacity: 123', - 'stateful.beam.metric:statecache:size: 10', - 'stateful.beam.metric:statecache:get: 20', - 'stateful.beam.metric:statecache:miss: 0', - 'stateful.beam.metric:statecache:hit: 20', - 'stateful.beam.metric:statecache:put: 0', - 'stateful.beam.metric:statecache:evict: 0', + 'stateful.beam_metric:statecache:capacity: 123', + 'stateful.beam_metric:statecache:size: 10', + 'stateful.beam_metric:statecache:get: 20', + 'stateful.beam_metric:statecache:miss: 0', + 'stateful.beam_metric:statecache:hit: 20', + 'stateful.beam_metric:statecache:put: 0', + 'stateful.beam_metric:statecache:evict: 0', # Counters - 'stateful.beam.metric:statecache:get_total: 220', - 'stateful.beam.metric:statecache:miss_total: 10', - 'stateful.beam.metric:statecache:hit_total: 210', - 'stateful.beam.metric:statecache:put_total: 10', - 'stateful.beam.metric:statecache:evict_total: 0', + 'stateful.beam_metric:statecache:get_total: 220', + 'stateful.beam_metric:statecache:miss_total: 10', + 'stateful.beam_metric:statecache:hit_total: 210', + 'stateful.beam_metric:statecache:put_total: 10', + 'stateful.beam_metric:statecache:evict_total: 0', ]) else: # Batch has a different processing model. All values for # a key are processed at once. lines_expected.update([ # Gauges - 'stateful).beam.metric:statecache:capacity: 123', + 'stateful).beam_metric:statecache:capacity: 123', # For the first key, the cache token will not be set yet. # It's lazily initialized after first access in StateRequestHandlers - 'stateful).beam.metric:statecache:size: 10', + 'stateful).beam_metric:statecache:size: 10', # We have 11 here because there are 110 / 10 elements per key - 'stateful).beam.metric:statecache:get: 12', - 'stateful).beam.metric:statecache:miss: 1', - 'stateful).beam.metric:statecache:hit: 11', + 'stateful).beam_metric:statecache:get: 12', + 'stateful).beam_metric:statecache:miss: 1', + 'stateful).beam_metric:statecache:hit: 11', # State is flushed back once per key - 'stateful).beam.metric:statecache:put: 1', - 'stateful).beam.metric:statecache:evict: 0', + 'stateful).beam_metric:statecache:put: 1', + 'stateful).beam_metric:statecache:evict: 0', # Counters - 'stateful).beam.metric:statecache:get_total: 120', - 'stateful).beam.metric:statecache:miss_total: 10', - 'stateful).beam.metric:statecache:hit_total: 110', - 'stateful).beam.metric:statecache:put_total: 10', - 'stateful).beam.metric:statecache:evict_total: 0', + 'stateful).beam_metric:statecache:get_total: 120', + 'stateful).beam_metric:statecache:miss_total: 10', + 'stateful).beam_metric:statecache:hit_total: 110', + 'stateful).beam_metric:statecache:put_total: 10', + 'stateful).beam_metric:statecache:evict_total: 0', ]) lines_actual = set() with open(self.test_metrics_path, 'r') as f: for line in f: + print(line, end='') for metric_str in lines_expected: metric_name = metric_str.split()[0] if metric_str in line: From 881da209b26c02b6923d2f3332c70fffae303837 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Thu, 27 May 2021 15:57:20 -0700 Subject: [PATCH 4/4] [BEAM-12277] fix mistake in source overrides --- runners/flink/1.13/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle index 3fce89bc470e..ec865d7c5efb 100644 --- a/runners/flink/1.13/build.gradle +++ b/runners/flink/1.13/build.gradle @@ -23,7 +23,7 @@ project.ext { flink_version = '1.13.0' // Version specific code overrides. main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", "${basePath}/1.12/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/main/java", './src/test/java'] + test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", "${basePath}/1.12/src/test/java", './src/test/java'] main_resources_overrides = [] test_resources_overrides = [] archives_base_name = 'beam-runners-flink-1.13'